You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:18 UTC
[flink] 05/07: [FLINK-14745] Add dependencies of job as list of
URLs in config
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd8570f8551c78608b847109c0a61f8a46c6cb65
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 13 15:09:03 2019 +0100
[FLINK-14745] Add dependencies of job as list of URLs in config
---
.../src/main/java/org/apache/flink/client/ClientUtils.java | 2 +-
.../src/main/java/org/apache/flink/client/cli/CliFrontend.java | 3 ++-
.../org/apache/flink/client/cli/ExecutionConfigAccessor.java | 9 ++++++++-
.../java/org/apache/flink/client/program/PackagedProgram.java | 4 ++--
.../org/apache/flink/client/program/PackagedProgramUtils.java | 2 +-
.../java/org/apache/flink/client/cli/CliFrontendRunTest.java | 4 ++--
.../flink/table/client/gateway/local/ExecutionContext.java | 10 ++++------
7 files changed, 20 insertions(+), 14 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f756449..043b740 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -147,7 +147,7 @@ public enum ClientUtils {
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
client,
- program.getAllLibraries(),
+ program.getJobJarAndDependencies(),
program.getClasspaths(),
program.getUserCodeClassLoader(),
parallelism,
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 6e9b2f9..9ff6d53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -211,7 +211,8 @@ public class CliFrontend {
final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+ final List<URL> jobJars = program.getJobJarAndDependencies();
+ final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
final Configuration executionConfig = executionParameters.getConfiguration();
try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index ee32449..ec627ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -55,15 +55,18 @@ public class ExecutionConfigAccessor {
/**
* Creates an {@link ExecutionConfigAccessor} based on the provided {@link ProgramOptions} as provided by the user through the CLI.
*/
- public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options) {
+ public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options, final List<URL> jobJars) {
checkNotNull(options);
+ checkNotNull(jobJars);
final Configuration configuration = new Configuration();
+
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism());
configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode());
configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit());
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
+ ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jobJars, URL::toString);
SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration);
@@ -74,6 +77,10 @@ public class ExecutionConfigAccessor {
return configuration;
}
+ public List<URL> getJars() {
+ return decodeUrlList(configuration, PipelineOptions.JARS);
+ }
+
public List<URL> getClasspaths() {
return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2b593ca..1d96646 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -140,7 +140,7 @@ public class PackagedProgram {
// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
this.classpaths = classpaths;
- this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
+ this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getJobJarAndDependencies(), classpaths, getClass().getClassLoader());
// load the entry point class
this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
@@ -227,7 +227,7 @@ public class PackagedProgram {
/**
* Returns all provided libraries needed to run the program.
*/
- public List<URL> getAllLibraries() {
+ public List<URL> getJobJarAndDependencies() {
List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
if (jarFile != null) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 00ac231..edf3617 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -63,7 +63,7 @@ public class PackagedProgramUtils {
if (jobID != null) {
jobGraph.setJobID(jobID);
}
- jobGraph.addJars(packagedProgram.getAllLibraries());
+ jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index d2aff28..449e1b2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -85,7 +85,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+ ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList());
SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
@@ -99,7 +99,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+ ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList());
SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index e37635f..99c43a8 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -113,7 +113,6 @@ public class ExecutionContext<ClusterID> {
private final SessionContext sessionContext;
private final Environment mergedEnv;
- private final List<URL> dependencies;
private final ClassLoader classLoader;
private final Map<String, Module> modules;
private final Map<String, Catalog> catalogs;
@@ -136,7 +135,6 @@ public class ExecutionContext<ClusterID> {
Configuration flinkConfig, ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List<CustomCommandLine> availableCommandLines) throws FlinkException {
this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
- this.dependencies = dependencies;
this.flinkConfig = flinkConfig;
// create class loader
@@ -184,7 +182,7 @@ public class ExecutionContext<ClusterID> {
clusterClientFactory = serviceLoader.getClusterClientFactory(executorConfig);
checkState(clusterClientFactory != null);
- executionParameters = createExecutionParameterProvider(commandLine);
+ executionParameters = createExecutionParameterProvider(commandLine, dependencies);
clusterId = clusterClientFactory.getClusterId(executorConfig);
clusterSpec = clusterClientFactory.getClusterSpecification(executorConfig);
}
@@ -262,10 +260,10 @@ public class ExecutionContext<ClusterID> {
throw new SqlExecutionException("Could not find a matching deployment.");
}
- private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine) {
+ private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine, List<URL> jobJars) {
try {
final ProgramOptions programOptions = new ProgramOptions(commandLine);
- return ExecutionConfigAccessor.fromProgramOptions(programOptions);
+ return ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
} catch (CliArgsException e) {
throw new SqlExecutionException("Invalid deployment run options.", e);
}
@@ -487,7 +485,7 @@ public class ExecutionContext<ClusterID> {
flinkConfig,
parallelism);
- jobGraph.addJars(dependencies);
+ jobGraph.addJars(executionParameters.getJars());
jobGraph.setClasspaths(executionParameters.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());