You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 14:44:56 UTC
[flink] 03/14: [hotfix] Merge configurations in arguemnt list of
runProgram() in CLIFrontend
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e90d6ae6989143855d56ed80462678142f56bae
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 15:08:25 2019 +0100
[hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend
---
.../org/apache/flink/client/cli/CliFrontend.java | 44 ++++++++++++++--------
.../flink/client/cli/ExecutionConfigAccessor.java | 5 ++-
2 files changed, 31 insertions(+), 18 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38243fc..5ed2901 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -208,33 +208,45 @@ public class CliFrontend {
throw new CliArgsException("Could not build the program from JAR file.", e);
}
- final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
- final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-
- final List<URL> jobJars = program.getJobJarAndDependencies();
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
- final Configuration executionConfig = executionParameters.getConfiguration();
+ final Configuration effectiveConfiguration = getEffectiveConfiguration(
+ commandLine,
+ programOptions,
+ program.getJobJarAndDependencies());
try {
- runProgram(executorConfig, executionConfig, program);
+ runProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
+ private Configuration getEffectiveConfiguration(
+ final CommandLine commandLine,
+ final ProgramOptions programOptions,
+ final List<URL> jobJars) throws FlinkException {
+
+ final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
+ final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
+ checkNotNull(programOptions),
+ checkNotNull(jobJars));
+
+ final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+ final Configuration effectiveConfiguration = new Configuration(executorConfig);
+ return executionParameters.applyToConfiguration(effectiveConfiguration);
+ }
+
private <ClusterID> void runProgram(
- Configuration executorConfig,
- Configuration executionConfig,
+ Configuration configuration,
PackagedProgram program) throws ProgramInvocationException, FlinkException {
- final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+ final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
checkNotNull(clusterClientFactory);
- final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
+ final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try {
- final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+ final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+ final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
final ClusterClient<ClusterID> client;
// directly deploy the job if the cluster is started in job mode and detached
@@ -243,7 +255,7 @@ public class CliFrontend {
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
- final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+ final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
@@ -264,7 +276,7 @@ public class CliFrontend {
} else {
// also in job mode we have to deploy a session cluster because the job
// might consist of multiple parts (e.g. when using collect)
- final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+ final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
// there's a race-condition here if cli is killed before shutdown hook is installed
@@ -279,7 +291,7 @@ public class CliFrontend {
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
- executeProgram(executionConfig, program, client);
+ executeProgram(configuration, program, client);
} finally {
if (clusterId == null && !executionParameters.getDetachedMode()) {
// terminate the cluster only if we have started it before and if it's not detached
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 9e570e1..f55560b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -77,8 +77,9 @@ public class ExecutionConfigAccessor {
return new ExecutionConfigAccessor(configuration);
}
- public Configuration getConfiguration() {
- return configuration;
+ Configuration applyToConfiguration(final Configuration baseConfiguration) {
+ baseConfiguration.addAll(configuration);
+ return baseConfiguration;
}
public List<URL> getJars() {