You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 15:53:23 UTC

[flink] 03/16: [hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e90d6ae6989143855d56ed80462678142f56bae
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 15:08:25 2019 +0100

    [hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 44 ++++++++++++++--------
 .../flink/client/cli/ExecutionConfigAccessor.java  |  5 ++-
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38243fc..5ed2901 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -208,33 +208,45 @@ public class CliFrontend {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
 		}
 
-		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
-		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-
-		final List<URL> jobJars = program.getJobJarAndDependencies();
-		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
-		final Configuration executionConfig = executionParameters.getConfiguration();
+		final Configuration effectiveConfiguration = getEffectiveConfiguration(
+				commandLine,
+				programOptions,
+				program.getJobJarAndDependencies());
 
 		try {
-			runProgram(executorConfig, executionConfig, program);
+			runProgram(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
 	}
 
+	private Configuration getEffectiveConfiguration(
+			final CommandLine commandLine,
+			final ProgramOptions programOptions,
+			final List<URL> jobJars) throws FlinkException {
+
+		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
+				checkNotNull(programOptions),
+				checkNotNull(jobJars));
+
+		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		final Configuration effectiveConfiguration = new Configuration(executorConfig);
+		return executionParameters.applyToConfiguration(effectiveConfiguration);
+	}
+
 	private <ClusterID> void runProgram(
-			Configuration executorConfig,
-			Configuration executionConfig,
+			Configuration configuration,
 			PackagedProgram program) throws ProgramInvocationException, FlinkException {
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
 		checkNotNull(clusterClientFactory);
 
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
+		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
 			final ClusterClient<ClusterID> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
@@ -243,7 +255,7 @@ public class CliFrontend {
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 				client = clusterDescriptor.deployJobCluster(
 					clusterSpecification,
 					jobGraph,
@@ -264,7 +276,7 @@ public class CliFrontend {
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
@@ -279,7 +291,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(executionConfig, program, client);
+					executeProgram(configuration, program, client);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 9e570e1..f55560b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -77,8 +77,9 @@ public class ExecutionConfigAccessor {
 		return new ExecutionConfigAccessor(configuration);
 	}
 
-	public Configuration getConfiguration() {
-		return configuration;
+	Configuration applyToConfiguration(final Configuration baseConfiguration) {
+		baseConfiguration.addAll(configuration);
+		return baseConfiguration;
 	}
 
 	public List<URL> getJars() {