You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/22 13:59:22 UTC

[flink] branch master updated (dc102ec -> 1ad47e3)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from dc102ec  [FLINK-18378] Improve CatalogTable schema resolution
     new b05ab52  [FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
     new 1ad47e3  [hotfix] Annotate the DefaultClusterClientServiceLoader as internal

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/client/cli/CliFrontend.java  |  2 +-
 .../java/org/apache/flink/client/cli/GenericCLI.java   |  2 +-
 .../deployment/DefaultClusterClientServiceLoader.java  |  9 ++++++---
 .../core/execution/DefaultExecutorServiceLoader.java   | 18 ++++++++----------
 .../apache/flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../api/environment/RemoteStreamEnvironment.java       |  2 +-
 .../api/environment/StreamExecutionEnvironment.java    |  2 +-
 .../table/client/gateway/local/ProgramDeployer.java    |  2 +-
 8 files changed, 20 insertions(+), 19 deletions(-)


[flink] 01/02: [FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b05ab524836c1c07d128610248fe372c8d697974
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jun 19 14:10:02 2020 +0200

    [FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
    
    This closes #12719.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  |  2 +-
 .../java/org/apache/flink/client/cli/GenericCLI.java   |  2 +-
 .../deployment/DefaultClusterClientServiceLoader.java  |  7 ++++---
 .../core/execution/DefaultExecutorServiceLoader.java   | 18 ++++++++----------
 .../apache/flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../api/environment/RemoteStreamEnvironment.java       |  2 +-
 .../api/environment/StreamExecutionEnvironment.java    |  2 +-
 .../table/client/gateway/local/ProgramDeployer.java    |  2 +-
 8 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 159507e..ef7d7c9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -696,7 +696,7 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
-		ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false, false);
+		ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
 	}
 
 	/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
index 945779b..37628b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
@@ -146,7 +146,7 @@ public class GenericCLI implements CustomCommandLine {
 	}
 
 	private static String getExecutorFactoryNames() {
-		return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames()
+		return new DefaultExecutorServiceLoader().getExecutorNames()
 				.map(name -> String.format("\"%s\"", name))
 				.collect(Collectors.joining(", "));
 	}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index b2c43a2..ec56abe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -38,14 +38,15 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
 
-	private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
-
 	@Override
 	public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
 		checkNotNull(configuration);
 
+		final ServiceLoader<ClusterClientFactory> loader =
+				ServiceLoader.load(ClusterClientFactory.class);
+
 		final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
-		final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
+		final Iterator<ClusterClientFactory> factories = loader.iterator();
 		while (factories.hasNext()) {
 			try {
 				final ClusterClientFactory factory = factories.next();
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 6b07f7f..b5e9f08 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
 
-	private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
-
-	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
-
 	@Override
 	public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
 		checkNotNull(configuration);
 
+		final ServiceLoader<PipelineExecutorFactory> loader =
+				ServiceLoader.load(PipelineExecutorFactory.class);
+
 		final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
-		final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
+		final Iterator<PipelineExecutorFactory> factories = loader.iterator();
 		while (factories.hasNext()) {
 			try {
 				final PipelineExecutorFactory factory = factories.next();
@@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
 
 	@Override
 	public Stream<String> getExecutorNames() {
-		return StreamSupport.stream(defaultLoader.spliterator(), false)
-				.map(PipelineExecutorFactory::getName);
-	}
+		final ServiceLoader<PipelineExecutorFactory> loader =
+				ServiceLoader.load(PipelineExecutorFactory.class);
 
-	private DefaultExecutorServiceLoader() {
-		// make sure nobody instantiates us explicitly.
+		return StreamSupport.stream(loader.spliterator(), false)
+				.map(PipelineExecutorFactory::getName);
 	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9f0089d..910c419 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -157,7 +157,7 @@ public class ExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) {
-		this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
+		this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 560633e..8dd09f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -141,7 +141,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) {
-		this(DefaultExecutorServiceLoader.INSTANCE, host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
+		this(new DefaultExecutorServiceLoader(), host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
 	}
 
 	@PublicEvolving
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c688506..7de2e97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -203,7 +203,7 @@ public class StreamExecutionEnvironment {
 	public StreamExecutionEnvironment(
 			final Configuration configuration,
 			final ClassLoader userClassloader) {
-		this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
+		this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
 	}
 
 	/**
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 0070b1f..31b5143 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -68,7 +68,7 @@ public class ProgramDeployer {
 			throw new RuntimeException("No execution.target specified in your configuration file.");
 		}
 
-		PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
+		PipelineExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
 		final PipelineExecutorFactory executorFactory;
 		try {
 			executorFactory = executorServiceLoader.getExecutorFactory(configuration);


[flink] 02/02: [hotfix] Annotate the DefaultClusterClientServiceLoader as internal

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ad47e3ebe5d0de739a58966992a3b45c34bcb32
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jun 19 14:12:05 2020 +0200

    [hotfix] Annotate the DefaultClusterClientServiceLoader as internal
---
 .../flink/client/deployment/DefaultClusterClientServiceLoader.java      | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index ec56abe..0ba773b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A service provider for {@link ClusterClientFactory cluster client factories}.
  */
+@Internal
 public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);