You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 15:53:36 UTC

[flink] 16/16: [FLINK-XXXXX] Fix job client lifecycle issue

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfc0e0c121c3c23b7f46fecd0fa2d0d0882227fc
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:51:17 2019 +0100

    [FLINK-XXXXX] Fix job client lifecycle issue
---
 .../client/deployment/executors/JobClientImpl.java |  5 +++++
 .../StandaloneSessionClusterExecutor.java          |  5 ++---
 .../org/apache/flink/core/execution/JobClient.java |  2 +-
 .../flink/api/java/ExecutionEnvironment.java       | 11 ++++++-----
 .../environment/StreamExecutionEnvironment.java    |  9 +++++----
 .../yarn/executors/YarnJobClusterExecutor.java     |  7 +++----
 .../yarn/executors/YarnSessionClusterExecutor.java | 23 +++++++---------------
 7 files changed, 29 insertions(+), 33 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
index e042369..c6f48bfe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient {
 		}));
 		return res;
 	}
+
+	@Override
+	public void close() throws Exception {
+		this.clusterClient.close();
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index e5cc82e..df4d2ba 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements Executor {
 			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
-				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-			}
+			final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 		}
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8440dd1..b4ab9a9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
  * A client that is scoped to a specific job.
  */
 @PublicEvolving
-public interface JobClient {
+public interface JobClient extends AutoCloseable {
 
 	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 69abe17..c600eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -805,12 +805,13 @@ public class ExecutionEnvironment {
 
 		final Executor executor = executorFactory.getExecutor(configuration);
 
-		final JobClient jobClient = executor.execute(plan, configuration).get();
-		lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
-				? jobClient.getJobExecutionResult(userClassloader).get()
-				: jobClient.getJobSubmissionResult().get();
+		try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+			lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
 
-		return lastJobExecutionResult;
+			return lastJobExecutionResult;
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7a59d5a..fdaaae0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		final JobClient jobClient = executor.execute(streamGraph, configuration).get();
-		return configuration.getBoolean(DeploymentOptions.ATTACHED)
-				? jobClient.getJobExecutionResult(userClassloader).get()
-				: jobClient.getJobSubmissionResult().get();
+		try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+			return configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 09094e7..ead6e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor {
 
 			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
 
-			try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
-				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
-				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
-			}
+			final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
 		}
 	}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index dd15d1b..5b72045 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -56,40 +56,31 @@ public class YarnSessionClusterExecutor implements Executor {
 
 	@Override
 	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> dependencies = configAccessor.getJars();
-		final List<URL> classpaths = configAccessor.getClasspaths();
-
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+		final JobGraph jobGraph = getJobGraph(pipeline, configuration);
 
 		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
 			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
-				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-			}
+			// TODO: 17.11.19 we cannot close the client here because we simply have a future of the client 
+			final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 		}
 	}
 
 	private JobGraph getJobGraph(
 			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
+			final Configuration configuration) {
 
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
 
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 		final JobGraph jobGraph = FlinkPipelineTranslationUtil
 				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
 
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
 		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
 
 		return jobGraph;