You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:36 UTC

[flink] branch release-1.10 updated (32c9731 -> c6c0839)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 32c9731  [FLINK-16694][ci] Limit number of dumped log lines
     new 15c3171  [hotfix][tests] Remove unused TestingScheduledExecutor
     new b417353  [hotfix][tests] Shutdown TaskmanagerServices
     new cbb8cb6  [FLINK-17558][tests] Add TestExecutorResource
     new 8e8dbb8  [FLINK-17558][runtime] Add Executors#newCachedThreadPool
     new cce9711  [FLINK-17558][tests] Simplify partition tracker setup
     new 0efeea3  [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
     new c6c0839  [FLINK-17558][netty] Release partitions asynchronously

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/concurrent/Executors.java |  27 +++++
 .../io/network/NettyShuffleEnvironment.java        |  15 ++-
 .../io/network/NettyShuffleServiceFactory.java     |  31 +++++-
 .../runtime/shuffle/ShuffleEnvironmentContext.java |  11 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |   7 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  16 +--
 .../TaskManagerServicesConfiguration.java          |  16 ++-
 .../flink/runtime/concurrent/ExecutorsTest.java    |  63 +++++++++++
 .../io/network/NettyShuffleEnvironmentBuilder.java |  21 +++-
 .../io/network/NettyShuffleEnvironmentTest.java    |  26 +++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  58 +++++-----
 .../TaskExecutorPartitionLifecycleTest.java        | 119 +++++++++++++++------
 .../runtime/util/TestingScheduledExecutor.java     |  62 -----------
 .../testutils/executor/TestExecutorResource.java   |  42 +++++---
 14 files changed, 358 insertions(+), 156 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java
 copy flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java => flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java (51%)


[flink] 06/07: [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0efeea3ac5d3c04b830e87209b3e7357c8826931
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:13:23 2020 +0200

    [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 25 ++++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 4f6ee44..81b651f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -244,6 +244,24 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
+		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
+		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
+		partitionTrackerSetup.accept(partitionTracker);
+
+		internalTestPartitionRelease(
+			partitionTracker,
+			new NettyShuffleEnvironmentBuilder().build(),
+			startTrackingFuture,
+			testAction
+		);
+	}
+
+	private void internalTestPartitionRelease(
+			TaskExecutorPartitionTracker partitionTracker,
+			ShuffleEnvironment<?, ?> shuffleEnvironment,
+			CompletableFuture<ResultPartitionID> startTrackingFuture,
+			TestAction testAction) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
 			PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -276,8 +294,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
-		final ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
-
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
 			.setTaskStateManager(localStateStoresManager)
@@ -301,11 +317,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			})
 			.build();
 
-		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
-		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
-		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-		partitionTrackerSetup.accept(partitionTracker);
-
 		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
 		final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();


[flink] 07/07: [FLINK-17558][netty] Release partitions asynchronously

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6c0839d40571bb9ad47b8dbbec0b0ea14615372
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue May 19 16:58:22 2020 +0200

    [FLINK-17558][netty] Release partitions asynchronously
---
 .../io/network/NettyShuffleEnvironment.java        | 15 ++++--
 .../io/network/NettyShuffleServiceFactory.java     | 31 +++++++++++--
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  7 ++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 16 ++++---
 .../TaskManagerServicesConfiguration.java          | 16 ++++++-
 .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
 .../io/network/NettyShuffleEnvironmentTest.java    | 26 +++++++++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../TaskExecutorPartitionLifecycleTest.java        | 53 ++++++++++++++++++++++
 10 files changed, 177 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 2d2d6f3..5e7c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -56,6 +56,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -94,6 +95,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	private final SingleInputGateFactory singleInputGateFactory;
 
+	private final Executor ioExecutor;
+
 	private boolean isClosed;
 
 	NettyShuffleEnvironment(
@@ -104,7 +107,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 			ResultPartitionManager resultPartitionManager,
 			FileChannelManager fileChannelManager,
 			ResultPartitionFactory resultPartitionFactory,
-			SingleInputGateFactory singleInputGateFactory) {
+			SingleInputGateFactory singleInputGateFactory,
+			Executor ioExecutor) {
 		this.taskExecutorResourceId = taskExecutorResourceId;
 		this.config = config;
 		this.networkBufferPool = networkBufferPool;
@@ -114,6 +118,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 		this.fileChannelManager = fileChannelManager;
 		this.resultPartitionFactory = resultPartitionFactory;
 		this.singleInputGateFactory = singleInputGateFactory;
+		this.ioExecutor = ioExecutor;
 		this.isClosed = false;
 	}
 
@@ -148,9 +153,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	@Override
 	public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) {
-		for (ResultPartitionID partitionId : partitionIds) {
-			resultPartitionManager.releasePartition(partitionId, null);
-		}
+		ioExecutor.execute(() -> {
+			for (ResultPartitionID partitionId : partitionIds) {
+				resultPartitionManager.releasePartition(partitionId, null);
+			}
+		});
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index afbaba2..f065122 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkConfig,
 			shuffleEnvironmentContext.getTaskExecutorResourceId(),
 			shuffleEnvironmentContext.getEventPublisher(),
-			shuffleEnvironmentContext.getParentMetricGroup());
+			shuffleEnvironmentContext.getParentMetricGroup(),
+			shuffleEnvironmentContext.getIoExecutor());
 	}
 
 	@VisibleForTesting
@@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			NettyShuffleEnvironmentConfiguration config,
 			ResourceID taskExecutorResourceId,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup) {
+			MetricGroup metricGroup,
+			Executor ioExecutor) {
+		return createNettyShuffleEnvironment(
+			config,
+			taskExecutorResourceId,
+			taskEventPublisher,
+			new ResultPartitionManager(),
+			metricGroup,
+			ioExecutor);
+	}
+
+	@VisibleForTesting
+	static NettyShuffleEnvironment createNettyShuffleEnvironment(
+			NettyShuffleEnvironmentConfiguration config,
+			ResourceID taskExecutorResourceId,
+			TaskEventPublisher taskEventPublisher,
+			ResultPartitionManager resultPartitionManager,
+			MetricGroup metricGroup,
+			Executor ioExecutor) {
 		checkNotNull(config);
 		checkNotNull(taskExecutorResourceId);
 		checkNotNull(taskEventPublisher);
+		checkNotNull(resultPartitionManager);
 		checkNotNull(metricGroup);
 
 		NettyConfig nettyConfig = config.nettyConfig();
 
-		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-
 		FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
 
 		ConnectionManager connectionManager = nettyConfig != null ?
@@ -125,6 +145,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			resultPartitionManager,
 			fileChannelManager,
 			resultPartitionFactory,
-			singleInputGateFactory);
+			singleInputGateFactory,
+			ioExecutor);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7863f18..3116372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext {
 	private final TaskEventPublisher eventPublisher;
 	private final MetricGroup parentMetricGroup;
 
+	private final Executor ioExecutor;
+
 	public ShuffleEnvironmentContext(
 			Configuration configuration,
 			ResourceID taskExecutorResourceId,
@@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext {
 			boolean localCommunicationOnly,
 			InetAddress hostAddress,
 			TaskEventPublisher eventPublisher,
-			MetricGroup parentMetricGroup) {
+			MetricGroup parentMetricGroup,
+			Executor ioExecutor) {
 		this.configuration = checkNotNull(configuration);
 		this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
 		this.networkMemorySize = networkMemorySize;
@@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext {
 		this.hostAddress = checkNotNull(hostAddress);
 		this.eventPublisher = checkNotNull(eventPublisher);
 		this.parentMetricGroup = checkNotNull(parentMetricGroup);
+		this.ioExecutor = ioExecutor;
 	}
 
 	public Configuration getConfiguration() {
@@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext {
 	public MetricGroup getParentMetricGroup() {
 		return parentMetricGroup;
 	}
+
+	public Executor getIoExecutor() {
+		return ioExecutor;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 019d186..7f66d03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -31,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
@@ -373,10 +374,14 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			resourceID,
 			taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+		final ExecutorService ioExecutor = Executors.newCachedThreadPool(
+			taskManagerServicesConfiguration.getNumIoThreads(),
+			new ExecutorThreadFactory("flink-taskexecutor-io"));
+
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
 			taskManagerMetricGroup.f1,
-			rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.
+			ioExecutor);
 
 		TaskManagerConfiguration taskManagerConfiguration =
 			TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 04c3b29..a443fb4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -208,14 +209,14 @@ public class TaskManagerServices {
 	 *
 	 * @param taskManagerServicesConfiguration task manager configuration
 	 * @param taskManagerMetricGroup metric group of the task manager
-	 * @param taskIOExecutor executor for async IO operations
+	 * @param ioExecutor executor for async IO operations
 	 * @return task manager components
 	 * @throws Exception
 	 */
 	public static TaskManagerServices fromConfiguration(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			MetricGroup taskManagerMetricGroup,
-			Executor taskIOExecutor) throws Exception {
+			ExecutorService ioExecutor) throws Exception {
 
 		// pre-start checks
 		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -228,7 +229,8 @@ public class TaskManagerServices {
 		final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
 			taskManagerServicesConfiguration,
 			taskEventDispatcher,
-			taskManagerMetricGroup);
+			taskManagerMetricGroup,
+			ioExecutor);
 		final int dataPort = shuffleEnvironment.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -262,7 +264,7 @@ public class TaskManagerServices {
 		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
 			taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
 			stateRootDirectoryFiles,
-			taskIOExecutor);
+			ioExecutor);
 
 		return new TaskManagerServices(
 			taskManagerLocation,
@@ -297,7 +299,8 @@ public class TaskManagerServices {
 	private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			TaskEventDispatcher taskEventDispatcher,
-			MetricGroup taskManagerMetricGroup) throws FlinkException {
+			MetricGroup taskManagerMetricGroup,
+			Executor ioExecutor) throws FlinkException {
 
 		final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
 			taskManagerServicesConfiguration.getConfiguration(),
@@ -306,7 +309,8 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
 			taskManagerServicesConfiguration.getTaskManagerAddress(),
 			taskEventDispatcher,
-			taskManagerMetricGroup);
+			taskManagerMetricGroup,
+			ioExecutor);
 
 		return ShuffleServiceLoader
 			.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 5ec28e0..480394c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.util.ClusterEntrypointUtils;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,8 @@ public class TaskManagerServicesConfiguration {
 
 	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
 
+	private final int numIoThreads;
+
 	public TaskManagerServicesConfiguration(
 			Configuration configuration,
 			ResourceID resourceID,
@@ -85,7 +88,8 @@ public class TaskManagerServicesConfiguration {
 			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long timerServiceShutdownTimeout,
 			RetryingRegistrationConfiguration retryingRegistrationConfiguration,
-			Optional<Time> systemResourceMetricsProbingInterval) {
+			Optional<Time> systemResourceMetricsProbingInterval,
+			int numIoThreads) {
 		this.configuration = checkNotNull(configuration);
 		this.resourceID = checkNotNull(resourceID);
 
@@ -100,6 +104,7 @@ public class TaskManagerServicesConfiguration {
 		this.pageSize = pageSize;
 
 		this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+		this.numIoThreads = numIoThreads;
 
 		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
 			"service shutdown timeout must be greater or equal to 0.");
@@ -178,6 +183,10 @@ public class TaskManagerServicesConfiguration {
 		return retryingRegistrationConfiguration;
 	}
 
+	public int getNumIoThreads() {
+		return numIoThreads;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Parsing of Flink configuration
 	// --------------------------------------------------------------------------------------------
@@ -215,6 +224,8 @@ public class TaskManagerServicesConfiguration {
 
 		final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+		final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration);
+
 		return new TaskManagerServicesConfiguration(
 			configuration,
 			resourceID,
@@ -229,6 +240,7 @@ public class TaskManagerServicesConfiguration {
 			taskExecutorResourceSpec,
 			timerServiceShutdownTimeout,
 			retryingRegistrationConfiguration,
-			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
+			numIoThreads);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ec29f82..26949cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.time.Duration;
+import java.util.concurrent.Executor;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
@@ -59,6 +62,10 @@ public class NettyShuffleEnvironmentBuilder {
 
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
+	private ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+
+	private Executor ioExecutor = Executors.directExecutor();
+
 	public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
 		this.taskManagerLocation = taskManagerLocation;
 		return this;
@@ -109,6 +116,16 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
+	public NettyShuffleEnvironmentBuilder setResultPartitionManager(ResultPartitionManager resultPartitionManager) {
+		this.resultPartitionManager = resultPartitionManager;
+		return this;
+	}
+
+	public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) {
+		this.ioExecutor = ioExecutor;
+		return this;
+	}
+
 	public NettyShuffleEnvironment build() {
 		return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
 			new NettyShuffleEnvironmentConfiguration(
@@ -128,6 +145,8 @@ public class NettyShuffleEnvironmentBuilder {
 				compressionCodec),
 			taskManagerLocation,
 			new TaskEventDispatcher(),
-			metricGroup);
+			resultPartitionManager,
+			metricGroup,
+			ioExecutor);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index b1a23a4..c0b094f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -40,6 +43,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -99,6 +104,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
+	@Test
+	public void testSlowIODoesNotBlockRelease() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(Executors.newFixedThreadPool(1))
+			.build();
+
+		shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID()));
+		sync.awaitBlocker();
+		sync.releaseBlocker();
+	}
+
 	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(bufferPoolSize)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 49d6fb4..bb0079c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -226,6 +226,6 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 		return TaskManagerServices.fromConfiguration(
 			config,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			Executors.directExecutor());
+			Executors.newDirectExecutorService());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 81b651f..66de0d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -37,11 +37,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -61,6 +65,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.TriConsumer;
@@ -69,6 +74,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -81,6 +87,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.StreamSupport;
 
@@ -107,6 +114,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
 
+	@ClassRule
+	public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE =
+		new TestExecutorResource(() -> java.util.concurrent.Executors.newFixedThreadPool(1));
+
 	@Before
 	public void setup() {
 		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -243,6 +254,48 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		);
 	}
 
+	@Test
+	public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception {
+		BlockerSync sync = new BlockerSync();
+		ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() {
+			@Override
+			public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
+				sync.blockNonInterruptible();
+				super.releasePartition(partitionId, cause);
+			}
+		};
+
+		NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder()
+			.setResultPartitionManager(blockingResultPartitionManager)
+			.setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor())
+			.build();
+
+		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+		final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(shuffleEnvironment) {
+			@Override
+			public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) {
+				super.startTrackingPartition(producingJobId, partitionInfo);
+				startTrackingFuture.complete(partitionInfo.getResultPartitionId());
+			}
+		};
+
+		try {
+			internalTestPartitionRelease(
+				partitionTracker,
+				shuffleEnvironment,
+				startTrackingFuture,
+				(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
+					taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet());
+
+					// execute some operation to check whether the TaskExecutor is blocked
+					taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS);
+				}
+			);
+		} finally {
+			sync.releaseBlocker();
+		}
+	}
+
 	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
 		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
 		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();


[flink] 01/07: [hotfix][tests] Remove unused TestingScheduledExecutor

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 15c3171b909e623c9c0ddc21581bab1532f4f04b
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 12:33:31 2020 +0200

    [hotfix][tests] Remove unused TestingScheduledExecutor
---
 .../runtime/util/TestingScheduledExecutor.java     | 62 ----------------------
 1 file changed, 62 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java
deleted file mode 100644
index d9cfb11..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingScheduledExecutor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.util.ExecutorUtils;
-
-import org.junit.rules.ExternalResource;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Provide an automatically shut down scheduled executor for testing.
- */
-public class TestingScheduledExecutor extends ExternalResource {
-
-	private long shutdownTimeoutMillis;
-	private ScheduledExecutor scheduledExecutor;
-	private ScheduledExecutorService innerExecutorService;
-
-	public TestingScheduledExecutor() {
-		this(500L);
-	}
-
-	public TestingScheduledExecutor(long shutdownTimeoutMillis) {
-		this.shutdownTimeoutMillis = shutdownTimeoutMillis;
-	}
-
-	@Override
-	public void before() {
-		this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
-		this.scheduledExecutor = new ScheduledExecutorServiceAdapter(innerExecutorService);
-	}
-
-	@Override
-	protected void after() {
-		ExecutorUtils.gracefulShutdown(shutdownTimeoutMillis, TimeUnit.MILLISECONDS, innerExecutorService);
-	}
-
-	protected ScheduledExecutor getScheduledExecutor() {
-		return scheduledExecutor;
-	}
-}


[flink] 05/07: [FLINK-17558][tests] Simplify partition tracker setup

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cce971167d4de726dcdb7c9a6e4f0f648fb61266
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:11:09 2020 +0200

    [FLINK-17558][tests] Simplify partition tracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 43 ++++++++--------------
 1 file changed, 16 insertions(+), 27 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index b8f43d0..4f6ee44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -206,14 +206,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
+		final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
-				partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete);
-				return releasePartitionsForJobFuture;
-			},
-			(jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> {
-
+			partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete),
+			(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.disconnectJobManager(jobId, new Exception("test"));
 
 				assertThat(releasePartitionsForJobFuture.get(), equalTo(jobId));
@@ -223,13 +219,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionReleaseAfterReleaseCall() throws Exception {
+		final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
-				return releasePartitionsFuture;
-			},
-			(jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete),
+			(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet());
 
 				assertThat(releasePartitionsFuture.get(), hasItems(partitionId));
@@ -239,13 +232,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionPromotion() throws Exception {
+		final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete);
-				return releasePartitionsFuture;
-			},
-			(jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete),
+			(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), Collections.singleton(partitionId));
 
 				assertThat(releasePartitionsFuture.get(), hasItems(partitionId));
@@ -253,7 +243,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		);
 	}
 
-	private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
 			PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -314,7 +304,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
 		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
 		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-		C partitionTrackerSetupResult = partitionTrackerSetup.accept(partitionTracker);
+		partitionTrackerSetup.accept(partitionTracker);
 
 		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
@@ -394,8 +384,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 				jobId,
 				taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID(),
 				taskExecutor,
-				taskExecutorGateway,
-				partitionTrackerSetupResult);
+				taskExecutorGateway);
 		} finally {
 			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
 		}
@@ -447,12 +436,12 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	@FunctionalInterface
-	private interface PartitionTrackerSetup<C> {
-		C accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
+	private interface PartitionTrackerSetup {
+		void accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
 	}
 
 	@FunctionalInterface
-	private interface TestAction<C> {
-		void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws Exception;
+	private interface TestAction {
+		void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws Exception;
 	}
 }


[flink] 03/07: [FLINK-17558][tests] Add TestExecutorResource

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cbb8cb65db71a600d551ba0ceb9bd413f47a5c3a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 28 14:45:13 2020 +0200

    [FLINK-17558][tests] Add TestExecutorResource
---
 .../testutils/executor/TestExecutorResource.java   | 55 ++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java
new file mode 100644
index 0000000..b7619fc
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.executor;
+
+import org.junit.rules.ExternalResource;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/**
+ * Resource which starts/stops an {@link ExecutorService} for testing purposes.
+ */
+public class TestExecutorResource extends ExternalResource {
+
+	private final Supplier<ExecutorService> serviceFactory;
+
+	private ExecutorService executorService;
+
+	public TestExecutorResource(Supplier<ExecutorService> serviceFactory) {
+		this.serviceFactory = serviceFactory;
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		executorService = serviceFactory.get();
+	}
+
+	public Executor getExecutor() {
+		// only return an Executor since this resource is in charge of the life cycle
+		return executorService;
+	}
+
+	@Override
+	protected void after() {
+		if (executorService != null) {
+			executorService.shutdown();
+		}
+	}
+}


[flink] 02/07: [hotfix][tests] Shutdown TaskmanagerServices

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b417353072b84c272fee818d4bfcead90c3a565c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 13:00:10 2020 +0200

    [hotfix][tests] Shutdown TaskmanagerServices
---
 .../TaskExecutorLocalStateStoresManagerTest.java   | 56 ++++++++++++----------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 13e5e69..49d6fb4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -69,23 +69,27 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		TaskManagerServices taskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(config));
 
-		TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
-
-		// verify configured directories for local state
-		String[] split = rootDirString.split(",");
-		File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
-		for (int i = 0; i < split.length; ++i) {
-			Assert.assertEquals(
-				new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-				rootDirectories[i]);
-		}
+		try {
+			TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
+
+			// verify configured directories for local state
+			String[] split = rootDirString.split(",");
+			File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
+			for (int i = 0; i < split.length; ++i) {
+				Assert.assertEquals(
+					new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+					rootDirectories[i]);
+			}
 
-		// verify local recovery mode
-		Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
+			// verify local recovery mode
+			Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
 
-		Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
-		for (File rootDirectory : rootDirectories) {
-			FileUtils.deleteFileOrDirectory(rootDirectory);
+			Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+			for (File rootDirectory : rootDirectories) {
+				FileUtils.deleteFileOrDirectory(rootDirectory);
+			}
+		} finally {
+			taskManagerServices.shutDown();
 		}
 	}
 
@@ -102,18 +106,22 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		TaskManagerServices taskManagerServices = createTaskManagerServices(taskManagerServicesConfiguration);
 
-		TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
+		try {
+			TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
 
-		String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
-		File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
+			String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
+			File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
 
-		for (int i = 0; i < tmpDirPaths.length; ++i) {
-			Assert.assertEquals(
-				new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-				localStateRootDirectories[i]);
-		}
+			for (int i = 0; i < tmpDirPaths.length; ++i) {
+				Assert.assertEquals(
+					new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+					localStateRootDirectories[i]);
+			}
 
-		Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
+			Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
+		} finally {
+			taskManagerServices.shutDown();
+		}
 	}
 
 	/**


[flink] 04/07: [FLINK-17558][runtime] Add Executors#newCachedThreadPool

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e8dbb8dbd51fa896ed7258cedb955931bc0e03d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 12:59:28 2020 +0200

    [FLINK-17558][runtime] Add Executors#newCachedThreadPool
---
 .../apache/flink/runtime/concurrent/Executors.java | 27 ++++++++++
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 41d9a32..c758752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
@@ -61,6 +67,27 @@ public class Executors {
 	}
 
 	/**
+	 * Returns a new cached thread pool with the desired maximum size.
+	 *
+	 * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
+	 * with the minimum pool size set to 0.
+	 * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
+	 * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool
+	 * is saturated.
+	 *
+	 * @see ExecutorThreadFactory
+	 * @param maxPoolSize maximum size of the thread pool
+	 * @param threadFactory thread factory to use
+	 * @return new cached thread pool
+	 */
+	public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) {
+		return new ThreadPoolExecutor(0, maxPoolSize,
+			60L, TimeUnit.SECONDS,
+			new LinkedBlockingQueue<>(),
+			threadFactory);
+	}
+
+	/**
 	 * Direct execution context.
 	 */
 	private static class DirectExecutionContext implements ExecutionContext {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
new file mode 100644
index 0000000..e3be776
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Tests for {@link Executors}.
+ */
+public class ExecutorsTest {
+
+	@Rule
+	public final TestExecutorResource executorResource = new TestExecutorResource(
+		() -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory()));
+
+	/**
+	 * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)}
+	 * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with
+	 * an exception if no thread was available to process it.
+	 */
+	@Test
+	public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException {
+		Executor executor = executorResource.getExecutor();
+
+		BlockerSync sync = new BlockerSync();
+		try {
+			// submit the first blocking task, which should block the single pool thread
+			executor.execute(sync::blockNonInterruptible);
+
+			// the thread is now blocked
+			sync.awaitBlocker();
+
+			// this task should not be rejected
+			executor.execute(sync::blockNonInterruptible);
+		} finally {
+			sync.releaseBlocker();
+		}
+	}
+}