You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 15:53:36 UTC
[flink] 16/16: [FLINK-XXXXX] Fix job client lifecycle issue
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bfc0e0c121c3c23b7f46fecd0fa2d0d0882227fc
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:51:17 2019 +0100
[FLINK-XXXXX] Fix job client lifecycle issue
---
.../client/deployment/executors/JobClientImpl.java | 5 +++++
.../StandaloneSessionClusterExecutor.java | 5 ++---
.../org/apache/flink/core/execution/JobClient.java | 2 +-
.../flink/api/java/ExecutionEnvironment.java | 11 ++++++-----
.../environment/StreamExecutionEnvironment.java | 9 +++++----
.../yarn/executors/YarnJobClusterExecutor.java | 7 +++----
.../yarn/executors/YarnSessionClusterExecutor.java | 23 +++++++---------------
7 files changed, 29 insertions(+), 33 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
index e042369..c6f48bfe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient {
}));
return res;
}
+
+ @Override
+ public void close() throws Exception {
+ this.clusterClient.close();
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index e5cc82e..df4d2ba 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements Executor {
final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
- try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
- return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
- }
+ final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
+ return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8440dd1..b4ab9a9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
* A client that is scoped to a specific job.
*/
@PublicEvolving
-public interface JobClient {
+public interface JobClient extends AutoCloseable {
CompletableFuture<JobExecutionResult> getJobSubmissionResult();
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 69abe17..c600eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -805,12 +805,13 @@ public class ExecutionEnvironment {
final Executor executor = executorFactory.getExecutor(configuration);
- final JobClient jobClient = executor.execute(plan, configuration).get();
- lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
- ? jobClient.getJobExecutionResult(userClassloader).get()
- : jobClient.getJobSubmissionResult().get();
+ try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+ lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
- return lastJobExecutionResult;
+ return lastJobExecutionResult;
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7a59d5a..fdaaae0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment {
executorServiceLoader.getExecutorFactory(configuration);
final Executor executor = executorFactory.getExecutor(configuration);
- final JobClient jobClient = executor.execute(streamGraph, configuration).get();
- return configuration.getBoolean(DeploymentOptions.ATTACHED)
- ? jobClient.getJobExecutionResult(userClassloader).get()
- : jobClient.getJobSubmissionResult().get();
+ try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+ return configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 09094e7..ead6e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor {
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
- try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
- LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
- return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
- }
+ final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+ LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+ return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
}
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index dd15d1b..5b72045 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -56,40 +56,31 @@ public class YarnSessionClusterExecutor implements Executor {
@Override
public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
- final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
- final List<URL> dependencies = configAccessor.getJars();
- final List<URL> classpaths = configAccessor.getClasspaths();
-
- final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+ final JobGraph jobGraph = getJobGraph(pipeline, configuration);
try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
- try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
- return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
- }
+ // TODO: 17.11.19 we cannot close the client here because we simply have a future of the client
+ final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
+ return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
}
}
private JobGraph getJobGraph(
final Pipeline pipeline,
- final Configuration configuration,
- final List<URL> classpaths,
- final List<URL> libraries) {
+ final Configuration configuration) {
checkNotNull(pipeline);
checkNotNull(configuration);
- checkNotNull(classpaths);
- checkNotNull(libraries);
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
final JobGraph jobGraph = FlinkPipelineTranslationUtil
.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
- jobGraph.addJars(libraries);
- jobGraph.setClasspaths(classpaths);
+ jobGraph.addJars(executionConfigAccessor.getJars());
+ jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
return jobGraph;