You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/22 13:59:23 UTC
[flink] 01/02: [FLINK-18352] Make
DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b05ab524836c1c07d128610248fe372c8d697974
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jun 19 14:10:02 2020 +0200
[FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
This closes #12719.
---
.../java/org/apache/flink/client/cli/CliFrontend.java | 2 +-
.../java/org/apache/flink/client/cli/GenericCLI.java | 2 +-
.../deployment/DefaultClusterClientServiceLoader.java | 7 ++++---
.../core/execution/DefaultExecutorServiceLoader.java | 18 ++++++++----------
.../apache/flink/api/java/ExecutionEnvironment.java | 2 +-
.../api/environment/RemoteStreamEnvironment.java | 2 +-
.../api/environment/StreamExecutionEnvironment.java | 2 +-
.../table/client/gateway/local/ProgramDeployer.java | 2 +-
8 files changed, 18 insertions(+), 19 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 159507e..ef7d7c9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -696,7 +696,7 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
- ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false, false);
+ ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
index 945779b..37628b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
@@ -146,7 +146,7 @@ public class GenericCLI implements CustomCommandLine {
}
private static String getExecutorFactoryNames() {
- return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames()
+ return new DefaultExecutorServiceLoader().getExecutorNames()
.map(name -> String.format("\"%s\"", name))
.collect(Collectors.joining(", "));
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index b2c43a2..ec56abe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -38,14 +38,15 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo
private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
- private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
-
@Override
public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
checkNotNull(configuration);
+ final ServiceLoader<ClusterClientFactory> loader =
+ ServiceLoader.load(ClusterClientFactory.class);
+
final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
- final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
+ final Iterator<ClusterClientFactory> factories = loader.iterator();
while (factories.hasNext()) {
try {
final ClusterClientFactory factory = factories.next();
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 6b07f7f..b5e9f08 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
- private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
-
- public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
-
@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
+ final ServiceLoader<PipelineExecutorFactory> loader =
+ ServiceLoader.load(PipelineExecutorFactory.class);
+
final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
- final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
+ final Iterator<PipelineExecutorFactory> factories = loader.iterator();
while (factories.hasNext()) {
try {
final PipelineExecutorFactory factory = factories.next();
@@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
@Override
public Stream<String> getExecutorNames() {
- return StreamSupport.stream(defaultLoader.spliterator(), false)
- .map(PipelineExecutorFactory::getName);
- }
+ final ServiceLoader<PipelineExecutorFactory> loader =
+ ServiceLoader.load(PipelineExecutorFactory.class);
- private DefaultExecutorServiceLoader() {
- // make sure nobody instantiates us explicitly.
+ return StreamSupport.stream(loader.spliterator(), false)
+ .map(PipelineExecutorFactory::getName);
}
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9f0089d..910c419 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -157,7 +157,7 @@ public class ExecutionEnvironment {
*/
@PublicEvolving
public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) {
- this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
+ this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 560633e..8dd09f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -141,7 +141,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
*/
@PublicEvolving
public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) {
- this(DefaultExecutorServiceLoader.INSTANCE, host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
+ this(new DefaultExecutorServiceLoader(), host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
}
@PublicEvolving
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c688506..7de2e97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -203,7 +203,7 @@ public class StreamExecutionEnvironment {
public StreamExecutionEnvironment(
final Configuration configuration,
final ClassLoader userClassloader) {
- this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
+ this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
}
/**
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 0070b1f..31b5143 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -68,7 +68,7 @@ public class ProgramDeployer {
throw new RuntimeException("No execution.target specified in your configuration file.");
}
- PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
+ PipelineExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
final PipelineExecutorFactory executorFactory;
try {
executorFactory = executorServiceLoader.getExecutorFactory(configuration);