You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:02 UTC

[flink] 06/19: [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0109616c3b1b1fdd77fdb5d18785d995b73c2a37
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Nov 15 15:36:55 2019 +0100

    [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader
---
 .../java/org/apache/flink/client/ClientUtils.java  | 37 ++++++++++------------
 1 file changed, 17 insertions(+), 20 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 2c80236..ac247ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
@@ -44,6 +45,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.jar.JarFile;
@@ -99,15 +101,20 @@ public enum ClientUtils {
 		return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns);
 	}
 
-	public static JobExecutionResult submitJob(
-			ClusterClient<?> client,
-			JobGraph jobGraph) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
+	public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+		return checkNotNull(client)
+				.submitJob(checkNotNull(jobGraph))
+				.thenApply(JobSubmissionResult::getJobID);
+	}
+
+	public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
+		return submitJobAndGetJobID(client, jobGraph)
+				.thenCompose(client::requestJobResult);
+	}
+
+	public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException {
 		try {
-			return client
-				.submitJob(jobGraph)
-				.thenApply(JobSubmissionResult::getJobID)
+			return submitJobAndGetJobID(client, jobGraph)
 				.thenApply(DetachedJobExecutionResult::new)
 				.get();
 		} catch (InterruptedException | ExecutionException e) {
@@ -120,18 +127,11 @@ public enum ClientUtils {
 			ClusterClient<?> client,
 			JobGraph jobGraph,
 			ClassLoader classLoader) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
 		checkNotNull(classLoader);
 
 		JobResult jobResult;
-
 		try {
-			jobResult = client
-				.submitJob(jobGraph)
-				.thenApply(JobSubmissionResult::getJobID)
-				.thenCompose(client::requestJobResult)
-				.get();
+			jobResult = submitJobAndGetResult(client, jobGraph).get();
 		} catch (InterruptedException | ExecutionException e) {
 			ExceptionUtils.checkInterrupted(e);
 			throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
@@ -151,12 +151,9 @@ public enum ClientUtils {
 
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 
-		final List<URL> jobJars = executionConfigAccessor.getJars();
-		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
-
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);