You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/09/16 11:14:12 UTC
[flink] branch master updated: [FLINK-19241] Forward ioExecutor
into ResourceManagers
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6d42f84 [FLINK-19241] Forward ioExecutor into ResourceManagers
6d42f84 is described below
commit 6d42f84e1b62d9a4560f5fe79db12af6437bb751
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Tue Sep 15 13:45:41 2020 +0200
[FLINK-19241] Forward ioExecutor into ResourceManagers
This closes #13399
---
.../runtime/clusterframework/MesosResourceManager.java | 11 +++++++----
.../clusterframework/MesosResourceManagerFactory.java | 8 ++++++--
.../runtime/clusterframework/MesosResourceManagerTest.java | 4 +++-
.../DefaultDispatcherResourceManagerComponentFactory.java | 3 ++-
.../flink/runtime/resourcemanager/ResourceManager.java | 11 ++++++++---
.../runtime/resourcemanager/ResourceManagerFactory.java | 11 ++++++++---
.../runtime/resourcemanager/StandaloneResourceManager.java | 7 +++++--
.../resourcemanager/StandaloneResourceManagerFactory.java | 8 ++++++--
.../resourcemanager/active/ActiveResourceManager.java | 7 +++++--
.../active/ActiveResourceManagerFactory.java | 14 ++++++++++----
.../active/LegacyActiveResourceManager.java | 7 +++++--
.../active/LegacyActiveResourceManagerFactory.java | 8 ++++++--
.../runtime/resourcemanager/ResourceManagerHATest.java | 4 +++-
.../resourcemanager/ResourceManagerJobMasterTest.java | 4 +++-
.../resourcemanager/ResourceManagerTaskExecutorTest.java | 4 +++-
.../resourcemanager/StandaloneResourceManagerTest.java | 4 +++-
.../runtime/resourcemanager/TestingResourceManager.java | 5 ++++-
.../resourcemanager/active/ActiveResourceManagerTest.java | 4 +++-
.../java/org/apache/flink/yarn/YarnResourceManager.java | 7 +++++--
.../flink/yarn/entrypoint/YarnResourceManagerFactory.java | 8 ++++++--
.../org/apache/flink/yarn/YarnResourceManagerTest.java | 4 +++-
21 files changed, 104 insertions(+), 39 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 458ce3c..416ed7b 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -86,6 +86,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import scala.Option;
@@ -164,7 +165,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec,
@Nullable String webUiUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
+ ResourceManagerMetricGroup resourceManagerMetricGroup,
+ Executor ioExecutor) {
super(
rpcService,
resourceId,
@@ -176,7 +178,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
- AkkaUtils.getTimeoutAsTime(flinkConfig));
+ AkkaUtils.getTimeoutAsTime(flinkConfig),
+ ioExecutor);
this.mesosServices = Preconditions.checkNotNull(mesosServices);
this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
@@ -235,7 +238,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
protected void initialize() throws ResourceManagerException {
// create and start the worker store
try {
- this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, getRpcService().getExecutor());
+ this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, ioExecutor);
workerStore.start();
} catch (Exception e) {
throw new ResourceManagerException("Unable to initialize the worker store.", e);
@@ -332,7 +335,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
} catch (final Exception e) {
throw new CompletionException(new ResourceManagerException(e));
}
- }, getRpcService().getExecutor());
+ }, ioExecutor);
}
/**
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
index 3d1c4f3..462f519 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
*/
@@ -73,7 +75,8 @@ public class MesosResourceManagerFactory extends LegacyActiveResourceManagerFact
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception {
+ ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+ Executor ioExecutor) throws Exception {
final MesosTaskManagerParameters taskManagerParameters = MesosUtils.createTmParameters(configuration, LOG);
final ContainerSpecification taskManagerContainerSpec = MesosUtils.createContainerSpec(configuration);
@@ -94,7 +97,8 @@ public class MesosResourceManagerFactory extends LegacyActiveResourceManagerFact
taskManagerParameters,
taskManagerContainerSpec,
webInterfaceUrl,
- resourceManagerMetricGroup);
+ resourceManagerMetricGroup,
+ ioExecutor);
}
@Override
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index b285516..ab22dbd 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -104,6 +104,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import scala.Option;
@@ -194,7 +195,8 @@ public class MesosResourceManagerTest extends TestLogger {
taskManagerParameters,
taskManagerContainerSpec,
null,
- resourceManagerMetricGroup);
+ resourceManagerMetricGroup,
+ ForkJoinPool.commonPool());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index 3212d00..1386162 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -174,7 +174,8 @@ public class DefaultDispatcherResourceManagerComponentFactory implements Dispatc
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
- hostname);
+ hostname,
+ ioExecutor);
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3c62248..4f693b5 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -84,6 +84,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -140,6 +141,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
private final ResourceManagerMetricGroup resourceManagerMetricGroup;
+ protected final Executor ioExecutor;
+
/** The service to elect a ResourceManager leader. */
private LeaderElectionService leaderElectionService;
@@ -168,7 +171,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- Time rpcTimeout) {
+ Time rpcTimeout,
+ Executor ioExecutor) {
super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null);
@@ -197,6 +201,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
throw new CompletionException(throwable);
})
);
+ this.ioExecutor = ioExecutor;
}
@@ -376,7 +381,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
return registrationResponse;
}
},
- getRpcService().getExecutor());
+ ioExecutor);
}
@Override
@@ -976,7 +981,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
}
},
- getRpcService().getExecutor());
+ ioExecutor);
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index 8d68faf..87cc526 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.ConfigurationException;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* {@link ResourceManager} factory.
*
@@ -50,7 +52,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
MetricRegistry metricRegistry,
- String hostname) throws Exception {
+ String hostname,
+ Executor ioExecutor) throws Exception {
final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname);
@@ -68,7 +71,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
clusterInformation,
webInterfaceUrl,
resourceManagerMetricGroup,
- resourceManagerRuntimeServices);
+ resourceManagerRuntimeServices,
+ ioExecutor);
}
protected abstract ResourceManager<T> createResourceManager(
@@ -81,7 +85,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception;
+ ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+ Executor ioExecutor) throws Exception;
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
Configuration configuration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index a6eb0a6..608032c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
@@ -59,7 +60,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
Time startupPeriodTime,
- Time rpcTimeout) {
+ Time rpcTimeout,
+ Executor ioExecutor) {
super(
rpcService,
resourceId,
@@ -71,7 +73,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
- rpcTimeout);
+ rpcTimeout,
+ ioExecutor);
this.startupPeriodTime = Preconditions.checkNotNull(startupPeriodTime);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index 135276b..528f548 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}.
*/
@@ -64,7 +66,8 @@ public final class StandaloneResourceManagerFactory extends ResourceManagerFacto
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
+ ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+ Executor ioExecutor) {
final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
@@ -80,7 +83,8 @@ public final class StandaloneResourceManagerFactory extends ResourceManagerFacto
fatalErrorHandler,
resourceManagerMetricGroup,
standaloneClusterStartupPeriodTime,
- AkkaUtils.getTimeoutAsTime(configuration));
+ AkkaUtils.getTimeoutAsTime(configuration),
+ ioExecutor);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index 656d5b4..c98caa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -50,6 +50,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -87,7 +88,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
+ ResourceManagerMetricGroup resourceManagerMetricGroup,
+ Executor ioExecutor) {
super(
rpcService,
resourceId,
@@ -99,7 +101,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
- AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)));
+ AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)),
+ ioExecutor);
this.flinkConfig = flinkConfig;
this.resourceManagerDriver = resourceManagerDriver;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
index 1007165..f0c68a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.RpcService;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* Factory class for creating {@link ActiveResourceManager} with various implementations of {@link ResourceManagerDriver}.
*/
@@ -54,7 +56,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
MetricRegistry metricRegistry,
- String hostname) throws Exception {
+ String hostname,
+ Executor ioExecutor) throws Exception {
return super.createResourceManager(
createActiveResourceManagerConfiguration(configuration),
resourceId,
@@ -65,7 +68,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
clusterInformation,
webInterfaceUrl,
metricRegistry,
- hostname);
+ hostname,
+ ioExecutor);
}
private Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
@@ -87,7 +91,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
+ ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+ Executor ioExecutor) {
return new ActiveResourceManager<>(
createResourceManagerDriver(configuration),
@@ -101,7 +106,8 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
- resourceManagerMetricGroup);
+ resourceManagerMetricGroup,
+ ioExecutor);
}
protected abstract ResourceManagerDriver<WorkerType> createResourceManagerDriver(Configuration configuration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java
index c6fba52..6142ed1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManager.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Base class for {@link ResourceManager} implementations which contains some common variables and methods.
@@ -74,7 +75,8 @@ public abstract class LegacyActiveResourceManager<WorkerType extends ResourceIDR
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
+ ResourceManagerMetricGroup resourceManagerMetricGroup,
+ Executor ioExecutor) {
super(
rpcService,
resourceId,
@@ -86,7 +88,8 @@ public abstract class LegacyActiveResourceManager<WorkerType extends ResourceIDR
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
- AkkaUtils.getTimeoutAsTime(flinkConfig));
+ AkkaUtils.getTimeoutAsTime(flinkConfig),
+ ioExecutor);
this.flinkConfig = flinkConfig;
this.env = env;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java
index bfc95ad..2244930 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/LegacyActiveResourceManagerFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.rpc.RpcService;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* Resource manager factory which creates active {@link ResourceManager} implementations.
*
@@ -56,7 +58,8 @@ public abstract class LegacyActiveResourceManagerFactory<T extends ResourceIDRet
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
MetricRegistry metricRegistry,
- String hostname) throws Exception {
+ String hostname,
+ Executor ioExecutor) throws Exception {
return super.createResourceManager(
createActiveResourceManagerConfiguration(configuration),
resourceId,
@@ -67,7 +70,8 @@ public abstract class LegacyActiveResourceManagerFactory<T extends ResourceIDRet
clusterInformation,
webInterfaceUrl,
metricRegistry,
- hostname);
+ hostname,
+ ioExecutor);
}
private Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 19d4bb8..807dbfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
/**
* ResourceManager HA test, including grant leadership and revoke leadership.
@@ -101,7 +102,8 @@ public class ResourceManagerHATest extends TestLogger {
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
Time.minutes(5L),
- RpcUtils.INF_TIMEOUT) {
+ RpcUtils.INF_TIMEOUT,
+ ForkJoinPool.commonPool()) {
@Override
public void revokeLeadership() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index b3814fa..efeb6ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -54,6 +54,7 @@ import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
@@ -153,7 +154,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
Time.minutes(5L),
- RpcUtils.INF_TIMEOUT);
+ RpcUtils.INF_TIMEOUT,
+ ForkJoinPool.commonPool());
resourceManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index d95b88f..a9abd0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -63,6 +63,7 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -165,7 +166,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
fatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
Time.minutes(5L),
- RpcUtils.INF_TIMEOUT);
+ RpcUtils.INF_TIMEOUT,
+ ForkJoinPool.commonPool());
resourceManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
index 1583e47..01e68be 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -157,7 +158,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
fatalErrorHandler,
resourceManagerMetricGroup,
startupPeriodTime,
- RpcUtils.INF_TIMEOUT);
+ RpcUtils.INF_TIMEOUT,
+ ForkJoinPool.commonPool());
this.rmServices = rmServices;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index e282a58..112171f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.rpc.RpcUtils;
import javax.annotation.Nullable;
+import java.util.concurrent.ForkJoinPool;
+
/**
* Simple {@link ResourceManager} implementation for testing purposes.
*/
@@ -59,7 +61,8 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
new ClusterInformation("localhost", 1234),
fatalErrorHandler,
resourceManagerMetricGroup,
- RpcUtils.INF_TIMEOUT);
+ RpcUtils.INF_TIMEOUT,
+ ForkJoinPool.commonPool());
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 17c7a7b..2a6fb00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -53,6 +53,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -427,7 +428,8 @@ public class ActiveResourceManagerTest extends TestLogger {
rmServices.jobLeaderIdService,
new ClusterInformation("localhost", 1234),
fatalErrorHandler,
- UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
+ ForkJoinPool.commonPool());
activeResourceManager.start();
rmServices.grantLeadership();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 2c6b335..38a1cdb 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -79,6 +79,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.stream.Collectors;
/**
@@ -135,7 +136,8 @@ public class YarnResourceManager extends LegacyActiveResourceManager<YarnWorkerN
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
+ ResourceManagerMetricGroup resourceManagerMetricGroup,
+ Executor ioExecutor) {
super(
flinkConfig,
env,
@@ -148,7 +150,8 @@ public class YarnResourceManager extends LegacyActiveResourceManager<YarnWorkerN
jobLeaderIdService,
clusterInformation,
fatalErrorHandler,
- resourceManagerMetricGroup);
+ resourceManagerMetricGroup,
+ ioExecutor);
this.yarnConfig = new YarnConfiguration();
this.workerNodeMap = new ConcurrentHashMap<>();
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index c846f7d..4c5b55c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.yarn.YarnWorkerNode;
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}.
*/
@@ -62,7 +64,8 @@ public class YarnResourceManagerFactory extends LegacyActiveResourceManagerFacto
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
+ ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+ Executor ioExecutor) {
return new YarnResourceManager(
rpcService,
@@ -77,7 +80,8 @@ public class YarnResourceManagerFactory extends LegacyActiveResourceManagerFacto
clusterInformation,
fatalErrorHandler,
webInterfaceUrl,
- resourceManagerMetricGroup);
+ resourceManagerMetricGroup,
+ ioExecutor);
}
@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 852968e..e0e3031 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -102,6 +102,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -209,7 +210,8 @@ public class YarnResourceManagerTest extends TestLogger {
clusterInformation,
fatalErrorHandler,
webInterfaceUrl,
- resourceManagerMetricGroup);
+ resourceManagerMetricGroup,
+ ForkJoinPool.commonPool());
this.testingYarnNMClientAsync = new TestingYarnNMClientAsync(this);
this.testingYarnAMRMClientAsync = new TestingYarnAMRMClientAsync(this);
}