You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:02 UTC
[flink] 06/19: [FLINK-XXXXX] Add methods to ClientUtils that do not
require userClassloader
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0109616c3b1b1fdd77fdb5d18785d995b73c2a37
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Nov 15 15:36:55 2019 +0100
[FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader
---
.../java/org/apache/flink/client/ClientUtils.java | 37 ++++++++++------------
1 file changed, 17 insertions(+), 20 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 2c80236..ac247ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.client;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.program.ClusterClient;
@@ -44,6 +45,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.JarFile;
@@ -99,15 +101,20 @@ public enum ClientUtils {
return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns);
}
- public static JobExecutionResult submitJob(
- ClusterClient<?> client,
- JobGraph jobGraph) throws ProgramInvocationException {
- checkNotNull(client);
- checkNotNull(jobGraph);
+ public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+ return checkNotNull(client)
+ .submitJob(checkNotNull(jobGraph))
+ .thenApply(JobSubmissionResult::getJobID);
+ }
+
+ public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
+ return submitJobAndGetJobID(client, jobGraph)
+ .thenCompose(client::requestJobResult);
+ }
+
+ public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException {
try {
- return client
- .submitJob(jobGraph)
- .thenApply(JobSubmissionResult::getJobID)
+ return submitJobAndGetJobID(client, jobGraph)
.thenApply(DetachedJobExecutionResult::new)
.get();
} catch (InterruptedException | ExecutionException e) {
@@ -120,18 +127,11 @@ public enum ClientUtils {
ClusterClient<?> client,
JobGraph jobGraph,
ClassLoader classLoader) throws ProgramInvocationException {
- checkNotNull(client);
- checkNotNull(jobGraph);
checkNotNull(classLoader);
JobResult jobResult;
-
try {
- jobResult = client
- .submitJob(jobGraph)
- .thenApply(JobSubmissionResult::getJobID)
- .thenCompose(client::requestJobResult)
- .get();
+ jobResult = submitJobAndGetResult(client, jobGraph).get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
@@ -151,12 +151,9 @@ public enum ClientUtils {
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
- final List<URL> jobJars = executionConfigAccessor.getJars();
- final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
-
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);