You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/10 17:29:07 UTC

[flink] 01/02: [FLINK-11253] Add shutdown hook for yarn session client

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1abf8d14d159a2330029082ed715cbf7dbcc08b3
Author: Tao Yang <ya...@alibaba-inc.com>
AuthorDate: Fri Jan 4 18:06:29 2019 +0800

    [FLINK-11253] Add shutdown hook for yarn session client
---
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 88 +++++++++++++---------
 1 file changed, 52 insertions(+), 36 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 9a05c16..673341e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -650,48 +651,22 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 						yarnClusterDescriptor.getYarnClient(),
 						yarnApplicationId,
 						new ScheduledExecutorServiceAdapter(scheduledExecutorService));
-
+					Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
+						() -> shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService,
+							yarnApplicationStatusMonitor, yarnApplicationId),
+						getClass().getSimpleName(),
+						LOG);
 					try {
 						runInteractiveCli(
 							clusterClient,
 							yarnApplicationStatusMonitor,
 							acceptInteractiveInput);
 					} finally {
-						try {
-							yarnApplicationStatusMonitor.close();
-						} catch (Exception e) {
-							LOG.info("Could not properly close the Yarn application status monitor.", e);
-						}
-
-						clusterClient.shutDownCluster();
-
-						try {
-							clusterClient.shutdown();
-						} catch (Exception e) {
-							LOG.info("Could not properly shutdown cluster client.", e);
-						}
-
-						// shut down the scheduled executor service
-						ExecutorUtils.gracefulShutdown(
-							1000L,
-							TimeUnit.MILLISECONDS,
-							scheduledExecutorService);
-
-						deleteYarnPropertiesFile();
-
-						ApplicationReport applicationReport;
-
-						try {
-							applicationReport = yarnClusterDescriptor
-								.getYarnClient()
-								.getApplicationReport(yarnApplicationId);
-						} catch (YarnException | IOException e) {
-							LOG.info("Could not log the final application report.", e);
-							applicationReport = null;
-						}
-
-						if (applicationReport != null) {
-							logFinalApplicationReport(applicationReport);
+						shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService,
+							yarnApplicationStatusMonitor, yarnApplicationId);
+						if (shutdownHook != null) {
+							// we do not need the hook anymore as we have just tried to shutdown the cluster.
+							ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 						}
 					}
 				}
@@ -707,6 +682,47 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		return 0;
 	}
 
+	private void shutdownCluster(AbstractYarnClusterDescriptor yarnClusterDescriptor,
+		ClusterClient clusterClient, ScheduledExecutorService scheduledExecutorService,
+		YarnApplicationStatusMonitor yarnApplicationStatusMonitor, ApplicationId yarnApplicationId) {
+		try {
+			yarnApplicationStatusMonitor.close();
+		} catch (Exception e) {
+			LOG.info("Could not properly close the Yarn application status monitor.", e);
+		}
+
+		clusterClient.shutDownCluster();
+
+		try {
+			clusterClient.shutdown();
+		} catch (Exception e) {
+			LOG.info("Could not properly shutdown cluster client.", e);
+		}
+
+		// shut down the scheduled executor service
+		ExecutorUtils.gracefulShutdown(
+			1000L,
+			TimeUnit.MILLISECONDS,
+			scheduledExecutorService);
+
+		deleteYarnPropertiesFile();
+
+		ApplicationReport applicationReport;
+
+		try {
+			applicationReport = yarnClusterDescriptor
+				.getYarnClient()
+				.getApplicationReport(yarnApplicationId);
+		} catch (YarnException | IOException e) {
+			LOG.info("Could not log the final application report.", e);
+			applicationReport = null;
+		}
+
+		if (applicationReport != null) {
+			logFinalApplicationReport(applicationReport);
+		}
+	}
+
 	private void logFinalApplicationReport(ApplicationReport appReport) {
 		LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport
 			.getYarnApplicationState() + " and final state " + appReport