You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:11 UTC
[flink] 15/19: [FLINK-XXXXX] Fix job client lifecycle issue
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e74ecd236a513330307cb46d4dccd2763b25255
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:51:17 2019 +0100
[FLINK-XXXXX] Fix job client lifecycle issue
---
.../client/deployment/executors/JobClientImpl.java | 5 +++++
.../StandaloneSessionClusterExecutor.java | 5 ++---
.../org/apache/flink/core/execution/JobClient.java | 2 +-
.../flink/api/java/ExecutionEnvironment.java | 11 +++++-----
.../flink/api/java/ExecutorDiscoveryTest.java | 5 +++++
.../environment/StreamExecutionEnvironment.java | 9 ++++----
.../environment/ExecutorDiscoveryTest.java | 5 +++++
.../yarn/executors/YarnJobClusterExecutor.java | 7 +++---
.../yarn/executors/YarnSessionClusterExecutor.java | 25 ++++++----------------
9 files changed, 39 insertions(+), 35 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
index e042369..c6f48bfe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient {
}));
return res;
}
+
+ @Override
+ public void close() throws Exception {
+ this.clusterClient.close();
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index e5cc82e..df4d2ba 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements Executor {
final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
- try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
- return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
- }
+ final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
+ return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8440dd1..b4ab9a9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
* A client that is scoped to a specific job.
*/
@PublicEvolving
-public interface JobClient {
+public interface JobClient extends AutoCloseable {
CompletableFuture<JobExecutionResult> getJobSubmissionResult();
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 69abe17..c600eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -805,12 +805,13 @@ public class ExecutionEnvironment {
final Executor executor = executorFactory.getExecutor(configuration);
- final JobClient jobClient = executor.execute(plan, configuration).get();
- lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
- ? jobClient.getJobExecutionResult(userClassloader).get()
- : jobClient.getJobSubmissionResult().get();
+ try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+ lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
- return lastJobExecutionResult;
+ return lastJobExecutionResult;
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 9acbf3d..f5c34a5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
}
+
+ @Override
+ public void close() {
+
+ }
});
};
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7a59d5a..fdaaae0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment {
executorServiceLoader.getExecutorFactory(configuration);
final Executor executor = executorFactory.getExecutor(configuration);
- final JobClient jobClient = executor.execute(streamGraph, configuration).get();
- return configuration.getBoolean(DeploymentOptions.ATTACHED)
- ? jobClient.getJobExecutionResult(userClassloader).get()
- : jobClient.getJobSubmissionResult().get();
+ try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+ return configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index ce593c2..97e4517 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
}
+
+ @Override
+ public void close() {
+
+ }
});
};
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 09094e7..ead6e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor {
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
- try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
- LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
- return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
- }
+ final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+ LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+ return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
}
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index dd15d1b..de4d148 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -33,8 +33,6 @@ import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import java.net.URL;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,40 +54,31 @@ public class YarnSessionClusterExecutor implements Executor {
@Override
public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
- final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
- final List<URL> dependencies = configAccessor.getJars();
- final List<URL> classpaths = configAccessor.getClasspaths();
-
- final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+ final JobGraph jobGraph = getJobGraph(pipeline, configuration);
try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
- try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
- return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
- }
+ // TODO: 17.11.19 we cannot close the client here because we simply have a future of the client
+ final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
+ return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
}
}
private JobGraph getJobGraph(
final Pipeline pipeline,
- final Configuration configuration,
- final List<URL> classpaths,
- final List<URL> libraries) {
+ final Configuration configuration) {
checkNotNull(pipeline);
checkNotNull(configuration);
- checkNotNull(classpaths);
- checkNotNull(libraries);
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
final JobGraph jobGraph = FlinkPipelineTranslationUtil
.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
- jobGraph.addJars(libraries);
- jobGraph.setClasspaths(classpaths);
+ jobGraph.addJars(executionConfigAccessor.getJars());
+ jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
return jobGraph;