You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:02:35 UTC
[flink] 01/02: [FLINK-9891] Added hook to shutdown cluster if a
session was created in per-job mode.
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 70b48d63e5aed6411ccbdb443af0222515331867
Author: Sebastian Klemke <pa...@nerdheim.de>
AuthorDate: Sat Aug 11 22:43:48 2018 +0200
[FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
This closes #6540.
---
.../java/org/apache/flink/client/cli/CliFrontend.java | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2e78e4a..780f814 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -249,13 +250,22 @@ public class CliFrontend {
LOG.info("Could not properly shut down the client.", e);
}
} else {
+ final Thread shutdownHook;
if (clusterId != null) {
client = clusterDescriptor.retrieve(clusterId);
+ shutdownHook = null;
} else {
// also in job mode we have to deploy a session cluster because the job
// might consist of multiple parts (e.g. when using collect)
final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+ // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+ // there's a race-condition here if cli is killed before shutdown hook is installed
+ if (!runOptions.getDetachedMode()) {
+ shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+ } else {
+ shutdownHook = null;
+ }
}
try {
@@ -278,12 +288,12 @@ public class CliFrontend {
executeProgram(program, client, userParallelism);
} finally {
- if (clusterId == null && !client.isDetached()) {
+ if (shutdownHook != null) {
// terminate the cluster only if we have started it before and if it's not detached
try {
- client.shutDownCluster();
- } catch (final Exception e) {
- LOG.info("Could not properly terminate the Flink cluster.", e);
+ shutdownHook.run();
+ } finally {
+ ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
}
}