You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/22 13:59:23 UTC

[flink] 01/02: [FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b05ab524836c1c07d128610248fe372c8d697974
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jun 19 14:10:02 2020 +0200

    [FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
    
    This closes #12719.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  |  2 +-
 .../java/org/apache/flink/client/cli/GenericCLI.java   |  2 +-
 .../deployment/DefaultClusterClientServiceLoader.java  |  7 ++++---
 .../core/execution/DefaultExecutorServiceLoader.java   | 18 ++++++++----------
 .../apache/flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../api/environment/RemoteStreamEnvironment.java       |  2 +-
 .../api/environment/StreamExecutionEnvironment.java    |  2 +-
 .../table/client/gateway/local/ProgramDeployer.java    |  2 +-
 8 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 159507e..ef7d7c9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -696,7 +696,7 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
-		ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false, false);
+		ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
 	}
 
 	/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
index 945779b..37628b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
@@ -146,7 +146,7 @@ public class GenericCLI implements CustomCommandLine {
 	}
 
 	private static String getExecutorFactoryNames() {
-		return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames()
+		return new DefaultExecutorServiceLoader().getExecutorNames()
 				.map(name -> String.format("\"%s\"", name))
 				.collect(Collectors.joining(", "));
 	}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index b2c43a2..ec56abe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -38,14 +38,15 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
 
-	private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
-
 	@Override
 	public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
 		checkNotNull(configuration);
 
+		final ServiceLoader<ClusterClientFactory> loader =
+				ServiceLoader.load(ClusterClientFactory.class);
+
 		final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
-		final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
+		final Iterator<ClusterClientFactory> factories = loader.iterator();
 		while (factories.hasNext()) {
 			try {
 				final ClusterClientFactory factory = factories.next();
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 6b07f7f..b5e9f08 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
 
-	private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
-
-	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
-
 	@Override
 	public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
 		checkNotNull(configuration);
 
+		final ServiceLoader<PipelineExecutorFactory> loader =
+				ServiceLoader.load(PipelineExecutorFactory.class);
+
 		final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
-		final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
+		final Iterator<PipelineExecutorFactory> factories = loader.iterator();
 		while (factories.hasNext()) {
 			try {
 				final PipelineExecutorFactory factory = factories.next();
@@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
 
 	@Override
 	public Stream<String> getExecutorNames() {
-		return StreamSupport.stream(defaultLoader.spliterator(), false)
-				.map(PipelineExecutorFactory::getName);
-	}
+		final ServiceLoader<PipelineExecutorFactory> loader =
+				ServiceLoader.load(PipelineExecutorFactory.class);
 
-	private DefaultExecutorServiceLoader() {
-		// make sure nobody instantiates us explicitly.
+		return StreamSupport.stream(loader.spliterator(), false)
+				.map(PipelineExecutorFactory::getName);
 	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9f0089d..910c419 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -157,7 +157,7 @@ public class ExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) {
-		this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
+		this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 560633e..8dd09f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -141,7 +141,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) {
-		this(DefaultExecutorServiceLoader.INSTANCE, host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
+		this(new DefaultExecutorServiceLoader(), host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
 	}
 
 	@PublicEvolving
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c688506..7de2e97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -203,7 +203,7 @@ public class StreamExecutionEnvironment {
 	public StreamExecutionEnvironment(
 			final Configuration configuration,
 			final ClassLoader userClassloader) {
-		this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
+		this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
 	}
 
 	/**
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 0070b1f..31b5143 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -68,7 +68,7 @@ public class ProgramDeployer {
 			throw new RuntimeException("No execution.target specified in your configuration file.");
 		}
 
-		PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
+		PipelineExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
 		final PipelineExecutorFactory executorFactory;
 		try {
 			executorFactory = executorServiceLoader.getExecutorFactory(configuration);