You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:14:52 UTC
[flink] 03/16: [hotfix] Make the DefaultExecutorServiceLoader a
singleton
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2b1fd1fcc80657215ad325a9191ae5e3e77fffda
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:20:46 2019 +0100
[hotfix] Make the DefaultExecutorServiceLoader a singleton
---
.../flink/core/execution/DefaultExecutorServiceLoader.java | 12 +++++++++---
.../java/org/apache/flink/api/java/ExecutionEnvironment.java | 2 +-
.../api/environment/StreamExecutionEnvironment.java | 2 +-
3 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 241feab..8bde967 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -45,6 +45,8 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class);
+ public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
+
@Override
public ExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
@@ -67,14 +69,18 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
}
if (compatibleFactories.size() > 1) {
- final List<String> configStr =
+ final String configStr =
configuration.toMap().entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
- .collect(Collectors.toList());
+ .collect(Collectors.joining("\n"));
- throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+ throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
}
return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
}
+
+ private DefaultExecutorServiceLoader() {
+ // make sure nobody instantiates us explicitly.
+ }
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..26632e6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -140,7 +140,7 @@ public class ExecutionEnvironment {
}
protected ExecutionEnvironment(final Configuration executorConfiguration) {
- this(new DefaultExecutorServiceLoader(), executorConfiguration);
+ this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
}
protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..ba702ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,7 +163,7 @@ public class StreamExecutionEnvironment {
}
public StreamExecutionEnvironment(final Configuration executorConfiguration) {
- this(new DefaultExecutorServiceLoader(), executorConfiguration);
+ this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
}
public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {