You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:18 UTC

[flink] 05/07: [FLINK-14745] Add dependencies of job as list of URLs in config

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd8570f8551c78608b847109c0a61f8a46c6cb65
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 13 15:09:03 2019 +0100

    [FLINK-14745] Add dependencies of job as list of URLs in config
---
 .../src/main/java/org/apache/flink/client/ClientUtils.java     |  2 +-
 .../src/main/java/org/apache/flink/client/cli/CliFrontend.java |  3 ++-
 .../org/apache/flink/client/cli/ExecutionConfigAccessor.java   |  9 ++++++++-
 .../java/org/apache/flink/client/program/PackagedProgram.java  |  4 ++--
 .../org/apache/flink/client/program/PackagedProgramUtils.java  |  2 +-
 .../java/org/apache/flink/client/cli/CliFrontendRunTest.java   |  4 ++--
 .../flink/table/client/gateway/local/ExecutionContext.java     | 10 ++++------
 7 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f756449..043b740 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -147,7 +147,7 @@ public enum ClientUtils {
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
 				client,
-				program.getAllLibraries(),
+				program.getJobJarAndDependencies(),
 				program.getClasspaths(),
 				program.getUserCodeClassLoader(),
 				parallelism,
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 6e9b2f9..9ff6d53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -211,7 +211,8 @@ public class CliFrontend {
 		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
 		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+		final List<URL> jobJars = program.getJobJarAndDependencies();
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
 		final Configuration executionConfig = executionParameters.getConfiguration();
 
 		try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index ee32449..ec627ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -55,15 +55,18 @@ public class ExecutionConfigAccessor {
 	/**
 	 * Creates an {@link ExecutionConfigAccessor} based on the provided {@link ProgramOptions} as provided by the user through the CLI.
 	 */
-	public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options) {
+	public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options, final List<URL> jobJars) {
 		checkNotNull(options);
+		checkNotNull(jobJars);
 
 		final Configuration configuration = new Configuration();
+
 		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism());
 		configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode());
 		configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit());
 
 		ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
+		ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jobJars, URL::toString);
 
 		SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration);
 
@@ -74,6 +77,10 @@ public class ExecutionConfigAccessor {
 		return configuration;
 	}
 
+	public List<URL> getJars() {
+		return decodeUrlList(configuration, PipelineOptions.JARS);
+	}
+
 	public List<URL> getClasspaths() {
 		return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
 	}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2b593ca..1d96646 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -140,7 +140,7 @@ public class PackagedProgram {
 		// now that we have an entry point, we can extract the nested jar files (if any)
 		this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
 		this.classpaths = classpaths;
-		this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
+		this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(getJobJarAndDependencies(), classpaths, getClass().getClassLoader());
 
 		// load the entry point class
 		this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
@@ -227,7 +227,7 @@ public class PackagedProgram {
 	/**
 	 * Returns all provided libraries needed to run the program.
 	 */
-	public List<URL> getAllLibraries() {
+	public List<URL> getJobJarAndDependencies() {
 		List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 
 		if (jarFile != null) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 00ac231..edf3617 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -63,7 +63,7 @@ public class PackagedProgramUtils {
 		if (jobID != null) {
 			jobGraph.setJobID(jobID);
 		}
-		jobGraph.addJars(packagedProgram.getAllLibraries());
+		jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
 		jobGraph.setClasspaths(packagedProgram.getClasspaths());
 		jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index d2aff28..449e1b2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -85,7 +85,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 
 			CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
 			ProgramOptions programOptions = new ProgramOptions(commandLine);
-			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList());
 
 			SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
 			assertTrue(savepointSettings.restoreSavepoint());
@@ -99,7 +99,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 
 			CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
 			ProgramOptions programOptions = new ProgramOptions(commandLine);
-			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList());
 
 			SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
 			assertTrue(savepointSettings.restoreSavepoint());
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index e37635f..99c43a8 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -113,7 +113,6 @@ public class ExecutionContext<ClusterID> {
 
 	private final SessionContext sessionContext;
 	private final Environment mergedEnv;
-	private final List<URL> dependencies;
 	private final ClassLoader classLoader;
 	private final Map<String, Module> modules;
 	private final Map<String, Catalog> catalogs;
@@ -136,7 +135,6 @@ public class ExecutionContext<ClusterID> {
 			Configuration flinkConfig, ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List<CustomCommandLine> availableCommandLines) throws FlinkException {
 		this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
 		this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
-		this.dependencies = dependencies;
 		this.flinkConfig = flinkConfig;
 
 		// create class loader
@@ -184,7 +182,7 @@ public class ExecutionContext<ClusterID> {
 		clusterClientFactory = serviceLoader.getClusterClientFactory(executorConfig);
 		checkState(clusterClientFactory != null);
 
-		executionParameters = createExecutionParameterProvider(commandLine);
+		executionParameters = createExecutionParameterProvider(commandLine, dependencies);
 		clusterId = clusterClientFactory.getClusterId(executorConfig);
 		clusterSpec = clusterClientFactory.getClusterSpecification(executorConfig);
 	}
@@ -262,10 +260,10 @@ public class ExecutionContext<ClusterID> {
 		throw new SqlExecutionException("Could not find a matching deployment.");
 	}
 
-	private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine) {
+	private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine, List<URL> jobJars) {
 		try {
 			final ProgramOptions programOptions = new ProgramOptions(commandLine);
-			return ExecutionConfigAccessor.fromProgramOptions(programOptions);
+			return ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
 		} catch (CliArgsException e) {
 			throw new SqlExecutionException("Invalid deployment run options.", e);
 		}
@@ -487,7 +485,7 @@ public class ExecutionContext<ClusterID> {
 					flinkConfig,
 					parallelism);
 
-			jobGraph.addJars(dependencies);
+			jobGraph.addJars(executionParameters.getJars());
 			jobGraph.setClasspaths(executionParameters.getClasspaths());
 			jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());