You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:15 UTC

[flink] 02/07: [hotfix] Move addJars() from ClientUtils to JobGraph

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86b00ad0fa5ef998179f22aee62efdd635945872
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 11 11:15:37 2019 +0100

    [hotfix] Move addJars() from ClientUtils to JobGraph
---
 .../main/java/org/apache/flink/client/ClientUtils.java  | 15 ---------------
 .../java/org/apache/flink/client/RemoteExecutor.java    |  2 +-
 .../apache/flink/client/program/ContextEnvironment.java |  2 +-
 .../flink/client/program/PackagedProgramUtils.java      |  3 +--
 .../org/apache/flink/client/program/ClientTest.java     |  2 +-
 .../org/apache/flink/runtime/jobgraph/JobGraph.java     | 17 +++++++++++++++++
 .../api/environment/StreamContextEnvironment.java       |  2 +-
 .../table/client/gateway/local/ExecutionContext.java    |  3 +--
 8 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 176d586..6245002 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -56,20 +55,6 @@ public enum ClientUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
 
-	/**
-	 * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. This will
-	 * throw an exception if a jar URL is not valid.
-	 */
-	public static void addJarFiles(JobGraph jobGraph, List<URL> jarFilesToAttach) {
-		for (URL jar : jarFilesToAttach) {
-			try {
-				jobGraph.addJar(new Path(jar.toURI()));
-			} catch (URISyntaxException e) {
-				throw new RuntimeException("URL is invalid. This should not happen.", e);
-			}
-		}
-	}
-
 	public static void checkJarFile(URL jar) throws IOException {
 		File jarFile;
 		try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index b601367..71fde64 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -108,7 +108,7 @@ public class RemoteExecutor extends PlanExecutor {
 				clientConfiguration,
 				getDefaultParallelism());
 
-		ClientUtils.addJarFiles(jobGraph, jarFiles);
+		jobGraph.addJars(jarFiles);
 		jobGraph.setClasspaths(globalClasspaths);
 
 		ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 8f9048c..08a02af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -84,7 +84,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 				client.getFlinkConfiguration(),
 				getParallelism());
 
-		ClientUtils.addJarFiles(jobGraph, this.jarFilesToAttach);
+		jobGraph.addJars(this.jarFilesToAttach);
 		jobGraph.setClasspaths(this.classpathsToAttach);
 
 		if (detached) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 2f27193..00ac231 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.CompilerException;
@@ -64,7 +63,7 @@ public class PackagedProgramUtils {
 		if (jobID != null) {
 			jobGraph.setJobID(jobID);
 		}
-		ClientUtils.addJarFiles(jobGraph, packagedProgram.getAllLibraries());
+		jobGraph.addJars(packagedProgram.getAllLibraries());
 		jobGraph.setClasspaths(packagedProgram.getClasspaths());
 		jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 78023d7..a1cc8a2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -160,7 +160,7 @@ public class ClientTest extends TestLogger {
 				new Configuration(),
 				1);
 
-		ClientUtils.addJarFiles(jobGraph, Collections.emptyList());
+		jobGraph.addJars(Collections.emptyList());
 		jobGraph.setClasspaths(Collections.emptyList());
 
 		JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f63a518..dceb517 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -474,6 +475,22 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
+	 * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}.
+	 *
+	 * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files to attach to the jobgraph.
+	 * @throws RuntimeException if a jar URL is not valid.
+	 */
+	public void addJars(final List<URL> jarFilesToAttach) {
+		for (URL jar : jarFilesToAttach) {
+			try {
+				addJar(new Path(jar.toURI()));
+			} catch (URISyntaxException e) {
+				throw new RuntimeException("URL is invalid. This should not happen.", e);
+			}
+		}
+	}
+
+	/**
 	 * Gets the list of assigned user jar paths.
 	 *
 	 * @return The list of assigned user jar paths
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 278a75b..bab31d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -51,7 +51,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 				ctx.getClient().getFlinkConfiguration(),
 				getParallelism());
 
-		ClientUtils.addJarFiles(jobGraph, ctx.getJars());
+		jobGraph.addJars(ctx.getJars());
 		jobGraph.setClasspaths(ctx.getClasspaths());
 
 		// running from the CLI will override the savepoint restore settings
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 692ebee..e37635f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CustomCommandLine;
@@ -488,7 +487,7 @@ public class ExecutionContext<ClusterID> {
 					flinkConfig,
 					parallelism);
 
-			ClientUtils.addJarFiles(jobGraph, dependencies);
+			jobGraph.addJars(dependencies);
 			jobGraph.setClasspaths(executionParameters.getClasspaths());
 			jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());