You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:14:52 UTC

[flink] 03/16: [hotfix] Make the DefaultExecutorServiceLoader a singleton

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b1fd1fcc80657215ad325a9191ae5e3e77fffda
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:20:46 2019 +0100

    [hotfix] Make the DefaultExecutorServiceLoader a singleton
---
 .../flink/core/execution/DefaultExecutorServiceLoader.java   | 12 +++++++++---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java |  2 +-
 .../api/environment/StreamExecutionEnvironment.java          |  2 +-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 241feab..8bde967 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -45,6 +45,8 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
 	private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class);
 
+	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
+
 	@Override
 	public ExecutorFactory getExecutorFactory(final Configuration configuration) {
 		checkNotNull(configuration);
@@ -67,14 +69,18 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 		}
 
 		if (compatibleFactories.size() > 1) {
-			final List<String> configStr =
+			final String configStr =
 					configuration.toMap().entrySet().stream()
 							.map(e -> e.getKey() + "=" + e.getValue())
-							.collect(Collectors.toList());
+							.collect(Collectors.joining("\n"));
 
-			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+			throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
 		}
 
 		return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
 	}
+
+	private DefaultExecutorServiceLoader() {
+		// make sure nobody instantiates us explicitly.
+	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..26632e6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -140,7 +140,7 @@ public class ExecutionEnvironment {
 	}
 
 	protected ExecutionEnvironment(final Configuration executorConfiguration) {
-		this(new DefaultExecutorServiceLoader(), executorConfiguration);
+		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..ba702ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,7 +163,7 @@ public class StreamExecutionEnvironment {
 	}
 
 	public StreamExecutionEnvironment(final Configuration executorConfiguration) {
-		this(new DefaultExecutorServiceLoader(), executorConfiguration);
+		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
 	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {