You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/10 17:29:07 UTC
[flink] 01/02: [FLINK-11253] Add shutdown hook for yarn session
client
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1abf8d14d159a2330029082ed715cbf7dbcc08b3
Author: Tao Yang <ya...@alibaba-inc.com>
AuthorDate: Fri Jan 4 18:06:29 2019 +0800
[FLINK-11253] Add shutdown hook for yarn session client
---
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 88 +++++++++++++---------
1 file changed, 52 insertions(+), 36 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 9a05c16..673341e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -650,48 +651,22 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
yarnClusterDescriptor.getYarnClient(),
yarnApplicationId,
new ScheduledExecutorServiceAdapter(scheduledExecutorService));
-
+ Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
+ () -> shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService,
+ yarnApplicationStatusMonitor, yarnApplicationId),
+ getClass().getSimpleName(),
+ LOG);
try {
runInteractiveCli(
clusterClient,
yarnApplicationStatusMonitor,
acceptInteractiveInput);
} finally {
- try {
- yarnApplicationStatusMonitor.close();
- } catch (Exception e) {
- LOG.info("Could not properly close the Yarn application status monitor.", e);
- }
-
- clusterClient.shutDownCluster();
-
- try {
- clusterClient.shutdown();
- } catch (Exception e) {
- LOG.info("Could not properly shutdown cluster client.", e);
- }
-
- // shut down the scheduled executor service
- ExecutorUtils.gracefulShutdown(
- 1000L,
- TimeUnit.MILLISECONDS,
- scheduledExecutorService);
-
- deleteYarnPropertiesFile();
-
- ApplicationReport applicationReport;
-
- try {
- applicationReport = yarnClusterDescriptor
- .getYarnClient()
- .getApplicationReport(yarnApplicationId);
- } catch (YarnException | IOException e) {
- LOG.info("Could not log the final application report.", e);
- applicationReport = null;
- }
-
- if (applicationReport != null) {
- logFinalApplicationReport(applicationReport);
+ shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService,
+ yarnApplicationStatusMonitor, yarnApplicationId);
+ if (shutdownHook != null) {
+ // we do not need the hook anymore as we have just tried to shutdown the cluster.
+ ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
}
}
}
@@ -707,6 +682,47 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
return 0;
}
+ private void shutdownCluster(AbstractYarnClusterDescriptor yarnClusterDescriptor,
+ ClusterClient clusterClient, ScheduledExecutorService scheduledExecutorService,
+ YarnApplicationStatusMonitor yarnApplicationStatusMonitor, ApplicationId yarnApplicationId) {
+ try {
+ yarnApplicationStatusMonitor.close();
+ } catch (Exception e) {
+ LOG.info("Could not properly close the Yarn application status monitor.", e);
+ }
+
+ clusterClient.shutDownCluster();
+
+ try {
+ clusterClient.shutdown();
+ } catch (Exception e) {
+ LOG.info("Could not properly shutdown cluster client.", e);
+ }
+
+ // shut down the scheduled executor service
+ ExecutorUtils.gracefulShutdown(
+ 1000L,
+ TimeUnit.MILLISECONDS,
+ scheduledExecutorService);
+
+ deleteYarnPropertiesFile();
+
+ ApplicationReport applicationReport;
+
+ try {
+ applicationReport = yarnClusterDescriptor
+ .getYarnClient()
+ .getApplicationReport(yarnApplicationId);
+ } catch (YarnException | IOException e) {
+ LOG.info("Could not log the final application report.", e);
+ applicationReport = null;
+ }
+
+ if (applicationReport != null) {
+ logFinalApplicationReport(applicationReport);
+ }
+ }
+
private void logFinalApplicationReport(ApplicationReport appReport) {
LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport
.getYarnApplicationState() + " and final state " + appReport