You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:15 UTC
[flink] 02/07: [hotfix] Move addJars() from ClientUtils to JobGraph
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 86b00ad0fa5ef998179f22aee62efdd635945872
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 11 11:15:37 2019 +0100
[hotfix] Move addJars() from ClientUtils to JobGraph
---
.../main/java/org/apache/flink/client/ClientUtils.java | 15 ---------------
.../java/org/apache/flink/client/RemoteExecutor.java | 2 +-
.../apache/flink/client/program/ContextEnvironment.java | 2 +-
.../flink/client/program/PackagedProgramUtils.java | 3 +--
.../org/apache/flink/client/program/ClientTest.java | 2 +-
.../org/apache/flink/runtime/jobgraph/JobGraph.java | 17 +++++++++++++++++
.../api/environment/StreamContextEnvironment.java | 2 +-
.../table/client/gateway/local/ExecutionContext.java | 3 +--
8 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 176d586..6245002 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.client.program.DetachedJobExecutionResult;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -56,20 +55,6 @@ public enum ClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
- /**
- * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. This will
- * throw an exception if a jar URL is not valid.
- */
- public static void addJarFiles(JobGraph jobGraph, List<URL> jarFilesToAttach) {
- for (URL jar : jarFilesToAttach) {
- try {
- jobGraph.addJar(new Path(jar.toURI()));
- } catch (URISyntaxException e) {
- throw new RuntimeException("URL is invalid. This should not happen.", e);
- }
- }
- }
-
public static void checkJarFile(URL jar) throws IOException {
File jarFile;
try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index b601367..71fde64 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -108,7 +108,7 @@ public class RemoteExecutor extends PlanExecutor {
clientConfiguration,
getDefaultParallelism());
- ClientUtils.addJarFiles(jobGraph, jarFiles);
+ jobGraph.addJars(jarFiles);
jobGraph.setClasspaths(globalClasspaths);
ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 8f9048c..08a02af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -84,7 +84,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
client.getFlinkConfiguration(),
getParallelism());
- ClientUtils.addJarFiles(jobGraph, this.jarFilesToAttach);
+ jobGraph.addJars(this.jarFilesToAttach);
jobGraph.setClasspaths(this.classpathsToAttach);
if (detached) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 2f27193..00ac231 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.CompilerException;
@@ -64,7 +63,7 @@ public class PackagedProgramUtils {
if (jobID != null) {
jobGraph.setJobID(jobID);
}
- ClientUtils.addJarFiles(jobGraph, packagedProgram.getAllLibraries());
+ jobGraph.addJars(packagedProgram.getAllLibraries());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 78023d7..a1cc8a2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -160,7 +160,7 @@ public class ClientTest extends TestLogger {
new Configuration(),
1);
- ClientUtils.addJarFiles(jobGraph, Collections.emptyList());
+ jobGraph.addJars(Collections.emptyList());
jobGraph.setClasspaths(Collections.emptyList());
JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f63a518..dceb517 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.SerializedValue;
import java.io.IOException;
import java.io.Serializable;
+import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -474,6 +475,22 @@ public class JobGraph implements Serializable {
}
/**
+ * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}.
+ *
+ * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files to attach to the jobgraph.
+ * @throws RuntimeException if a jar URL is not valid.
+ */
+ public void addJars(final List<URL> jarFilesToAttach) {
+ for (URL jar : jarFilesToAttach) {
+ try {
+ addJar(new Path(jar.toURI()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("URL is invalid. This should not happen.", e);
+ }
+ }
+ }
+
+ /**
* Gets the list of assigned user jar paths.
*
* @return The list of assigned user jar paths
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 278a75b..bab31d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -51,7 +51,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
ctx.getClient().getFlinkConfiguration(),
getParallelism());
- ClientUtils.addJarFiles(jobGraph, ctx.getJars());
+ jobGraph.addJars(ctx.getJars());
jobGraph.setClasspaths(ctx.getClasspaths());
// running from the CLI will override the savepoint restore settings
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 692ebee..e37635f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CustomCommandLine;
@@ -488,7 +487,7 @@ public class ExecutionContext<ClusterID> {
flinkConfig,
parallelism);
- ClientUtils.addJarFiles(jobGraph, dependencies);
+ jobGraph.addJars(dependencies);
jobGraph.setClasspaths(executionParameters.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());