You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/11 18:01:39 UTC
[flink] 01/02: [FLINK-21274][runtime] Change the
ClusterEntrypoint.runClusterEntrypoint to wait on the result of
clusterEntrypoint.getTerminationFuture().get() and do the System.exit
outside of the future callback
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a7f898ab3f6bc887c590abf6e2c6eab9a89d1d12
Author: wjc <wj...@foxmail.com>
AuthorDate: Tue Feb 9 11:07:51 2021 +0800
[FLINK-21274][runtime] Change the ClusterEntrypoint.runClusterEntrypoint to wait on the result of clusterEntrypoint.getTerminationFuture().get() and do the System.exit outside of the future callback
---
.../runtime/entrypoint/ClusterEntrypoint.java | 35 ++++++++++------------
1 file changed, 16 insertions(+), 19 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 6e6a604..35fa799 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -574,25 +574,22 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
- clusterEntrypoint
- .getTerminationFuture()
- .whenComplete(
- (applicationStatus, throwable) -> {
- final int returnCode;
-
- if (throwable != null) {
- returnCode = RUNTIME_FAILURE_RETURN_CODE;
- } else {
- returnCode = applicationStatus.processExitCode();
- }
-
- LOG.info(
- "Terminating cluster entrypoint process {} with exit code {}.",
- clusterEntrypointName,
- returnCode,
- throwable);
- System.exit(returnCode);
- });
+ int returnCode;
+ Throwable throwable = null;
+
+ try {
+ returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
+ } catch (Throwable e) {
+ throwable = ExceptionUtils.stripExecutionException(e);
+ returnCode = RUNTIME_FAILURE_RETURN_CODE;
+ }
+
+ LOG.info(
+ "Terminating cluster entrypoint process {} with exit code {}.",
+ clusterEntrypointName,
+ returnCode,
+ throwable);
+ System.exit(returnCode);
}
/** Execution mode of the {@link MiniDispatcher}. */