You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:11 UTC

[flink] 15/19: [FLINK-XXXXX] Fix job client lifecycle issue

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e74ecd236a513330307cb46d4dccd2763b25255
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:51:17 2019 +0100

    [FLINK-XXXXX] Fix job client lifecycle issue
---
 .../client/deployment/executors/JobClientImpl.java |  5 +++++
 .../StandaloneSessionClusterExecutor.java          |  5 ++---
 .../org/apache/flink/core/execution/JobClient.java |  2 +-
 .../flink/api/java/ExecutionEnvironment.java       | 11 +++++-----
 .../flink/api/java/ExecutorDiscoveryTest.java      |  5 +++++
 .../environment/StreamExecutionEnvironment.java    |  9 ++++----
 .../environment/ExecutorDiscoveryTest.java         |  5 +++++
 .../yarn/executors/YarnJobClusterExecutor.java     |  7 +++---
 .../yarn/executors/YarnSessionClusterExecutor.java | 25 ++++++----------------
 9 files changed, 39 insertions(+), 35 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
index e042369..c6f48bfe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient {
 		}));
 		return res;
 	}
+
+	@Override
+	public void close() throws Exception {
+		this.clusterClient.close();
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index e5cc82e..df4d2ba 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements Executor {
 			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
-				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-			}
+			final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 		}
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8440dd1..b4ab9a9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
  * A client that is scoped to a specific job.
  */
 @PublicEvolving
-public interface JobClient {
+public interface JobClient extends AutoCloseable {
 
 	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 69abe17..c600eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -805,12 +805,13 @@ public class ExecutionEnvironment {
 
 		final Executor executor = executorFactory.getExecutor(configuration);
 
-		final JobClient jobClient = executor.execute(plan, configuration).get();
-		lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
-				? jobClient.getJobExecutionResult(userClassloader).get()
-				: jobClient.getJobSubmissionResult().get();
+		try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+			lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
 
-		return lastJobExecutionResult;
+			return lastJobExecutionResult;
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 9acbf3d..f5c34a5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
 					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
 						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
 					}
+
+					@Override
+					public void close() {
+
+					}
 				});
 			};
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7a59d5a..fdaaae0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		final JobClient jobClient = executor.execute(streamGraph, configuration).get();
-		return configuration.getBoolean(DeploymentOptions.ATTACHED)
-				? jobClient.getJobExecutionResult(userClassloader).get()
-				: jobClient.getJobSubmissionResult().get();
+		try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+			return configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index ce593c2..97e4517 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
 					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
 						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
 					}
+
+					@Override
+					public void close() {
+
+					}
 				});
 			};
 		}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 09094e7..ead6e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor {
 
 			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
 
-			try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
-				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
-				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
-			}
+			final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
 		}
 	}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index dd15d1b..de4d148 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -33,8 +33,6 @@ import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,40 +54,31 @@ public class YarnSessionClusterExecutor implements Executor {
 
 	@Override
 	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> dependencies = configAccessor.getJars();
-		final List<URL> classpaths = configAccessor.getClasspaths();
-
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+		final JobGraph jobGraph = getJobGraph(pipeline, configuration);
 
 		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
 			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
-				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-			}
+			// TODO: 17.11.19 we cannot close the client here because we simply have a future of the client
+			final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 		}
 	}
 
 	private JobGraph getJobGraph(
 			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
+			final Configuration configuration) {
 
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
 
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 		final JobGraph jobGraph = FlinkPipelineTranslationUtil
 				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
 
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
 		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
 
 		return jobGraph;