You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/10/16 12:43:18 UTC

[flink] 07/15: [FLINK-14290] Use RemoteExecutor in RemoteStreamEnvironment

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85e5a77de130f4b064d3c70671f1fd7f3fc046e2
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 19 16:42:19 2019 +0200

    [FLINK-14290] Use RemoteExecutor in RemoteStreamEnvironment
---
 .../api/environment/RemoteStreamEnvironment.java      | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 13bde96..b393e5e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
@@ -225,7 +226,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	) throws ProgramInvocationException {
 		StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(jobName);
 		return executeRemotely(streamGraph,
-			streamExecutionEnvironment.getClass().getClassLoader(),
 			streamExecutionEnvironment.getConfig(),
 			jarFiles,
 			host,
@@ -242,7 +242,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 * @throws ProgramInvocationException
 	 */
 	private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
-		ClassLoader envClassLoader,
 		ExecutionConfig executionConfig,
 		List<URL> jarFiles,
 		String host,
@@ -255,8 +254,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(jarFiles, globalClasspaths, envClassLoader);
-
 		Configuration configuration = new Configuration();
 		configuration.addAll(clientConfiguration);
 
@@ -274,13 +271,18 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 				streamGraph.getJobGraph().getJobID(), e);
 		}
 
-		if (savepointRestoreSettings == null) {
-			savepointRestoreSettings = SavepointRestoreSettings.none();
+		if (savepointRestoreSettings != null) {
+			streamGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 		}
 
 		try {
-			return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
-				.getJobExecutionResult();
+			final PlanExecutor executor = PlanExecutor.createRemoteExecutor(
+					host,
+					port,
+					clientConfiguration,
+					jarFiles,
+					globalClasspaths);
+			return executor.executePlan(streamGraph).getJobExecutionResult();
 		}
 		catch (ProgramInvocationException e) {
 			throw e;
@@ -318,7 +320,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	@Deprecated
 	protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
 		return executeRemotely(streamGraph,
-			this.getClass().getClassLoader(),
 			getConfig(),
 			jarFiles,
 			host,