You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/09/16 11:14:12 UTC

[flink] branch master updated: [FLINK-19241] Forward ioExecutor into ResourceManagers

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d42f84  [FLINK-19241] Forward ioExecutor into ResourceManagers
6d42f84 is described below

commit 6d42f84e1b62d9a4560f5fe79db12af6437bb751
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Tue Sep 15 13:45:41 2020 +0200

    [FLINK-19241] Forward ioExecutor into ResourceManagers
    
    This closes #13399
---
 .../runtime/clusterframework/MesosResourceManager.java     | 11 +++++++----
 .../clusterframework/MesosResourceManagerFactory.java      |  8 ++++++--
 .../runtime/clusterframework/MesosResourceManagerTest.java |  4 +++-
 .../DefaultDispatcherResourceManagerComponentFactory.java  |  3 ++-
 .../flink/runtime/resourcemanager/ResourceManager.java     | 11 ++++++++---
 .../runtime/resourcemanager/ResourceManagerFactory.java    | 11 ++++++++---
 .../runtime/resourcemanager/StandaloneResourceManager.java |  7 +++++--
 .../resourcemanager/StandaloneResourceManagerFactory.java  |  8 ++++++--
 .../resourcemanager/active/ActiveResourceManager.java      |  7 +++++--
 .../active/ActiveResourceManagerFactory.java               | 14 ++++++++++----
 .../active/LegacyActiveResourceManager.java                |  7 +++++--
 .../active/LegacyActiveResourceManagerFactory.java         |  8 ++++++--
 .../runtime/resourcemanager/ResourceManagerHATest.java     |  4 +++-
 .../resourcemanager/ResourceManagerJobMasterTest.java      |  4 +++-
 .../resourcemanager/ResourceManagerTaskExecutorTest.java   |  4 +++-
 .../resourcemanager/StandaloneResourceManagerTest.java     |  4 +++-
 .../runtime/resourcemanager/TestingResourceManager.java    |  5 ++++-
 .../resourcemanager/active/ActiveResourceManagerTest.java  |  4 +++-
 .../java/org/apache/flink/yarn/YarnResourceManager.java    |  7 +++++--
 .../flink/yarn/entrypoint/YarnResourceManagerFactory.java  |  8 ++++++--
 .../org/apache/flink/yarn/YarnResourceManagerTest.java     |  4 +++-
 21 files changed, 104 insertions(+), 39 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 458ce3c..416ed7b 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -86,6 +86,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -164,7 +165,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			MesosTaskManagerParameters taskManagerParameters,
 			ContainerSpecification taskManagerContainerSpec,
 			@Nullable String webUiUrl,
-			ResourceManagerMetricGroup resourceManagerMetricGroup) {
+			ResourceManagerMetricGroup resourceManagerMetricGroup,
+			Executor ioExecutor) {
 		super(
 			rpcService,
 			resourceId,
@@ -176,7 +178,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			clusterInformation,
 			fatalErrorHandler,
 			resourceManagerMetricGroup,
-			AkkaUtils.getTimeoutAsTime(flinkConfig));
+			AkkaUtils.getTimeoutAsTime(flinkConfig),
+			ioExecutor);
 
 		this.mesosServices = Preconditions.checkNotNull(mesosServices);
 		this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
@@ -235,7 +238,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	protected void initialize() throws ResourceManagerException {
 		// create and start the worker store
 		try {
-			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, getRpcService().getExecutor());
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, ioExecutor);
 			workerStore.start();
 		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
@@ -332,7 +335,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			} catch (final Exception e) {
 				throw new CompletionException(new ResourceManagerException(e));
 			}
-		}, getRpcService().getExecutor());
+		}, ioExecutor);
 	}
 
 	/**
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
index 3d1c4f3..462f519 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
  */
@@ -73,7 +75,8 @@ public class MesosResourceManagerFactory extends LegacyActiveResourceManagerFact
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			ResourceManagerMetricGroup resourceManagerMetricGroup,
-			ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception {
+			ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+			Executor ioExecutor) throws Exception {
 
 		final MesosTaskManagerParameters taskManagerParameters = MesosUtils.createTmParameters(configuration, LOG);
 		final ContainerSpecification taskManagerContainerSpec = MesosUtils.createContainerSpec(configuration);
@@ -94,7 +97,8 @@ public class MesosResourceManagerFactory extends LegacyActiveResourceManagerFact
 			taskManagerParameters,
 			taskManagerContainerSpec,
 			webInterfaceUrl,
-			resourceManagerMetricGroup);
+			resourceManagerMetricGroup,
+			ioExecutor);
 	}
 
 	@Override
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index b285516..ab22dbd 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -104,6 +104,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -194,7 +195,8 @@ public class MesosResourceManagerTest extends TestLogger {
 				taskManagerParameters,
 				taskManagerContainerSpec,
 				null,
-				resourceManagerMetricGroup);
+				resourceManagerMetricGroup,
+				ForkJoinPool.commonPool());
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index 3212d00..1386162 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -174,7 +174,8 @@ public class DefaultDispatcherResourceManagerComponentFactory implements Dispatc
 				new ClusterInformation(hostname, blobServer.getPort()),
 				webMonitorEndpoint.getRestBaseUrl(),
 				metricRegistry,
-				hostname);
+				hostname,
+				ioExecutor);
 
 			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3c62248..4f693b5 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -84,6 +84,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
@@ -140,6 +141,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
 	private final ResourceManagerMetricGroup resourceManagerMetricGroup;
 
+	protected final Executor ioExecutor;
+
 	/** The service to elect a ResourceManager leader. */
 	private LeaderElectionService leaderElectionService;
 
@@ -168,7 +171,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			ClusterInformation clusterInformation,
 			FatalErrorHandler fatalErrorHandler,
 			ResourceManagerMetricGroup resourceManagerMetricGroup,
-			Time rpcTimeout) {
+			Time rpcTimeout,
+			Executor ioExecutor) {
 
 		super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null);
 
@@ -197,6 +201,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 					throw new CompletionException(throwable);
 				})
 		);
+		this.ioExecutor = ioExecutor;
 	}
 
 
@@ -376,7 +381,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 					return registrationResponse;
 				}
 			},
-			getRpcService().getExecutor());
+			ioExecutor);
 	}
 
 	@Override
@@ -976,7 +981,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 					leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
 				}
 			},
-			getRpcService().getExecutor());
+			ioExecutor);
 
 		confirmationFuture.whenComplete(
 			(Void ignored, Throwable throwable) -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index 8d68faf..87cc526 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.ConfigurationException;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * {@link ResourceManager} factory.
  *
@@ -50,7 +52,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			MetricRegistry metricRegistry,
-			String hostname) throws Exception {
+			String hostname,
+			Executor ioExecutor) throws Exception {
 
 		final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
 		final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname);
@@ -68,7 +71,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
 			clusterInformation,
 			webInterfaceUrl,
 			resourceManagerMetricGroup,
-			resourceManagerRuntimeServices);
+			resourceManagerRuntimeServices,
+			ioExecutor);
 	}
 
 	protected abstract ResourceManager<T> createResourceManager(
@@ -81,7 +85,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			ResourceManagerMetricGroup resourceManagerMetricGroup,
-			ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception;
+			ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+			Executor ioExecutor) throws Exception;
 
 	private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
 			Configuration configuration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index a6eb0a6..608032c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -59,7 +60,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			FatalErrorHandler fatalErrorHandler,
 			ResourceManagerMetricGroup resourceManagerMetricGroup,
 			Time startupPeriodTime,
-			Time rpcTimeout) {
+			Time rpcTimeout,
+			Executor ioExecutor) {
 		super(
 			rpcService,
 			resourceId,
@@ -71,7 +73,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			clusterInformation,
 			fatalErrorHandler,
 			resourceManagerMetricGroup,
-			rpcTimeout);
+			rpcTimeout,
+			ioExecutor);
 		this.startupPeriodTime = Preconditions.checkNotNull(startupPeriodTime);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index 135276b..528f548 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}.
  */
@@ -64,7 +66,8 @@ public final class StandaloneResourceManagerFactory extends ResourceManagerFacto
 		ClusterInformation clusterInformation,
 		@Nullable String webInterfaceUrl,
 		ResourceManagerMetricGroup resourceManagerMetricGroup,
-		ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
+		ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+		Executor ioExecutor) {
 
 		final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
 
@@ -80,7 +83,8 @@ public final class StandaloneResourceManagerFactory extends ResourceManagerFacto
 			fatalErrorHandler,
 			resourceManagerMetricGroup,
 			standaloneClusterStartupPeriodTime,
-			AkkaUtils.getTimeoutAsTime(configuration));
+			AkkaUtils.getTimeoutAsTime(configuration),
+			ioExecutor);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index 656d5b4..c98caa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -87,7 +88,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
 			JobLeaderIdService jobLeaderIdService,
 			ClusterInformation clusterInformation,
 			FatalErrorHandler fatalErrorHandler,
-			ResourceManagerMetricGroup resourceManagerMetricGroup) {
+			ResourceManagerMetricGroup resourceManagerMetricGroup,
+			Executor ioExecutor) {
 		super(
 				rpcService,
 				resourceId,
@@ -99,7 +101,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
 				clusterInformation,
 				fatalErrorHandler,
 				resourceManagerMetricGroup,
-				AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)));
+				AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)),
+				ioExecutor);
 
 		this.flinkConfig = flinkConfig;
 		this.resourceManagerDriver = resourceManagerDriver;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
index 1007165..f0c68a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * Factory class for creating {@link ActiveResourceManager} with various implementations of {@link ResourceManagerDriver}.
  */
@@ -54,7 +56,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			MetricRegistry metricRegistry,
-			String hostname) throws Exception {
+			String hostname,
+			Executor ioExecutor) throws Exception {
 		return super.createResourceManager(
 				createActiveResourceManagerConfiguration(configuration),
 				resourceId,
@@ -65,7 +68,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
 				clusterInformation,
 				webInterfaceUrl,
 				metricRegistry,
-				hostname);
+				hostname,
+				ioExecutor);
 	}
 
 	private Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
@@ -87,7 +91,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			ResourceManagerMetricGroup resourceManagerMetricGroup,
-			ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
+			ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+			Executor ioExecutor) {
 
 		return new ActiveResourceManager<>(
 				createResourceManagerDriver(configuration),
@@ -101,7 +106,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
 				resourceManagerRuntimeServices.getJobLeaderIdService(),
 				clusterInformation,
 				fatalErrorHandler,
-				resourceManagerMetricGroup);
+				resourceManagerMetricGroup,
+				ioExecutor);
 	}
 
 	protected abstract ResourceManagerDriver<WorkerType> createResourceManagerDriver(Configuration configuration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java
index c6fba52..6142ed1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for {@link ResourceManager} implementations which contains some common variables and methods.
@@ -74,7 +75,8 @@ public abstract class LegacyActiveResourceManager<WorkerType extends ResourceIDR
 			JobLeaderIdService jobLeaderIdService,
 			ClusterInformation clusterInformation,
 			FatalErrorHandler fatalErrorHandler,
-			ResourceManagerMetricGroup resourceManagerMetricGroup) {
+			ResourceManagerMetricGroup resourceManagerMetricGroup,
+			Executor ioExecutor) {
 		super(
 			rpcService,
 			resourceId,
@@ -86,7 +88,8 @@ public abstract class LegacyActiveResourceManager<WorkerType extends ResourceIDR
 			clusterInformation,
 			fatalErrorHandler,
 			resourceManagerMetricGroup,
-			AkkaUtils.getTimeoutAsTime(flinkConfig));
+			AkkaUtils.getTimeoutAsTime(flinkConfig),
+			ioExecutor);
 
 		this.flinkConfig = flinkConfig;
 		this.env = env;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java
index bfc95ad..2244930 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * Resource manager factory which creates active {@link ResourceManager} implementations.
  *
@@ -56,7 +58,8 @@ public abstract class LegacyActiveResourceManagerFactory<T extends ResourceIDRet
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			MetricRegistry metricRegistry,
-			String hostname) throws Exception {
+			String hostname,
+			Executor ioExecutor) throws Exception {
 		return super.createResourceManager(
 			createActiveResourceManagerConfiguration(configuration),
 			resourceId,
@@ -67,7 +70,8 @@ public abstract class LegacyActiveResourceManagerFactory<T extends ResourceIDRet
 			clusterInformation,
 			webInterfaceUrl,
 			metricRegistry,
-			hostname);
+			hostname,
+			ioExecutor);
 	}
 
 	private Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 19d4bb8..807dbfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * ResourceManager HA test, including grant leadership and revoke leadership.
@@ -101,7 +102,8 @@ public class ResourceManagerHATest extends TestLogger {
 				testingFatalErrorHandler,
 				UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
 				Time.minutes(5L),
-				RpcUtils.INF_TIMEOUT) {
+				RpcUtils.INF_TIMEOUT,
+				ForkJoinPool.commonPool()) {
 
 				@Override
 				public void revokeLeadership() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index b3814fa..efeb6ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -54,6 +54,7 @@ import org.junit.Test;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
@@ -153,7 +154,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			testingFatalErrorHandler,
 			UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
 			Time.minutes(5L),
-			RpcUtils.INF_TIMEOUT);
+			RpcUtils.INF_TIMEOUT,
+			ForkJoinPool.commonPool());
 
 		resourceManager.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index d95b88f..a9abd0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -63,6 +63,7 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -165,7 +166,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				fatalErrorHandler,
 				UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
 				Time.minutes(5L),
-				RpcUtils.INF_TIMEOUT);
+				RpcUtils.INF_TIMEOUT,
+				ForkJoinPool.commonPool());
 
 		resourceManager.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
index 1583e47..01e68be 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -157,7 +158,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
 				fatalErrorHandler,
 				resourceManagerMetricGroup,
 				startupPeriodTime,
-				RpcUtils.INF_TIMEOUT);
+				RpcUtils.INF_TIMEOUT,
+				ForkJoinPool.commonPool());
 			this.rmServices = rmServices;
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index e282a58..112171f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.ForkJoinPool;
+
 /**
  * Simple {@link ResourceManager} implementation for testing purposes.
  */
@@ -59,7 +61,8 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
 			new ClusterInformation("localhost", 1234),
 			fatalErrorHandler,
 			resourceManagerMetricGroup,
-			RpcUtils.INF_TIMEOUT);
+			RpcUtils.INF_TIMEOUT,
+			ForkJoinPool.commonPool());
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 17c7a7b..2a6fb00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -53,6 +53,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -427,7 +428,8 @@ public class ActiveResourceManagerTest extends TestLogger {
 					rmServices.jobLeaderIdService,
 					new ClusterInformation("localhost", 1234),
 					fatalErrorHandler,
-					UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
+					UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
+					ForkJoinPool.commonPool());
 
 			activeResourceManager.start();
 			rmServices.grantLeadership();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 2c6b335..38a1cdb 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -79,6 +79,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
 /**
@@ -135,7 +136,8 @@ public class YarnResourceManager extends LegacyActiveResourceManager<YarnWorkerN
 			ClusterInformation clusterInformation,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String webInterfaceUrl,
-			ResourceManagerMetricGroup resourceManagerMetricGroup) {
+			ResourceManagerMetricGroup resourceManagerMetricGroup,
+			Executor ioExecutor) {
 		super(
 			flinkConfig,
 			env,
@@ -148,7 +150,8 @@ public class YarnResourceManager extends LegacyActiveResourceManager<YarnWorkerN
 			jobLeaderIdService,
 			clusterInformation,
 			fatalErrorHandler,
-			resourceManagerMetricGroup);
+			resourceManagerMetricGroup,
+			ioExecutor);
 		this.yarnConfig = new YarnConfiguration();
 		this.workerNodeMap = new ConcurrentHashMap<>();
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index c846f7d..4c5b55c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.yarn.YarnWorkerNode;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}.
  */
@@ -62,7 +64,8 @@ public class YarnResourceManagerFactory extends LegacyActiveResourceManagerFacto
 			ClusterInformation clusterInformation,
 			@Nullable String webInterfaceUrl,
 			ResourceManagerMetricGroup resourceManagerMetricGroup,
-			ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
+			ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+			Executor ioExecutor) {
 
 		return new YarnResourceManager(
 			rpcService,
@@ -77,7 +80,8 @@ public class YarnResourceManagerFactory extends LegacyActiveResourceManagerFacto
 			clusterInformation,
 			fatalErrorHandler,
 			webInterfaceUrl,
-			resourceManagerMetricGroup);
+			resourceManagerMetricGroup,
+			ioExecutor);
 	}
 
 	@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 852968e..e0e3031 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -102,6 +102,7 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -209,7 +210,8 @@ public class YarnResourceManagerTest extends TestLogger {
 				clusterInformation,
 				fatalErrorHandler,
 				webInterfaceUrl,
-				resourceManagerMetricGroup);
+				resourceManagerMetricGroup,
+				ForkJoinPool.commonPool());
 			this.testingYarnNMClientAsync = new TestingYarnNMClientAsync(this);
 			this.testingYarnAMRMClientAsync = new TestingYarnAMRMClientAsync(this);
 		}