You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/10/16 12:43:18 UTC
[flink] 07/15: [FLINK-14290] Use RemoteExecutor in
RemoteStreamEnvironment
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 85e5a77de130f4b064d3c70671f1fd7f3fc046e2
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 19 16:42:19 2019 +0200
[FLINK-14290] Use RemoteExecutor in RemoteStreamEnvironment
---
.../api/environment/RemoteStreamEnvironment.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 13bde96..b393e5e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
@@ -225,7 +226,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
) throws ProgramInvocationException {
StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(jobName);
return executeRemotely(streamGraph,
- streamExecutionEnvironment.getClass().getClassLoader(),
streamExecutionEnvironment.getConfig(),
jarFiles,
host,
@@ -242,7 +242,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
* @throws ProgramInvocationException
*/
private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
- ClassLoader envClassLoader,
ExecutionConfig executionConfig,
List<URL> jarFiles,
String host,
@@ -255,8 +254,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
LOG.info("Running remotely at {}:{}", host, port);
}
- ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(jarFiles, globalClasspaths, envClassLoader);
-
Configuration configuration = new Configuration();
configuration.addAll(clientConfiguration);
@@ -274,13 +271,18 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
streamGraph.getJobGraph().getJobID(), e);
}
- if (savepointRestoreSettings == null) {
- savepointRestoreSettings = SavepointRestoreSettings.none();
+ if (savepointRestoreSettings != null) {
+ streamGraph.setSavepointRestoreSettings(savepointRestoreSettings);
}
try {
- return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
- .getJobExecutionResult();
+ final PlanExecutor executor = PlanExecutor.createRemoteExecutor(
+ host,
+ port,
+ clientConfiguration,
+ jarFiles,
+ globalClasspaths);
+ return executor.executePlan(streamGraph).getJobExecutionResult();
}
catch (ProgramInvocationException e) {
throw e;
@@ -318,7 +320,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
@Deprecated
protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
return executeRemotely(streamGraph,
- this.getClass().getClassLoader(),
getConfig(),
jarFiles,
host,