You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/17 17:35:14 UTC

[flink] branch release-1.11 updated: [FLINK-18290][checkpointing] Don't System.exit on CheckpointCoordinator failure if it is shut down

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4a6825b  [FLINK-18290][checkpointing] Don't System.exit on CheckpointCoordinator failure if it is shut down
4a6825b is described below

commit 4a6825b843a22f84c266a826fe7cabfac2ed15fd
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jun 17 12:32:12 2020 +0200

    [FLINK-18290][checkpointing] Don't System.exit on CheckpointCoordinator failure if it is shut down
---
 .../flink/runtime/checkpoint/CheckpointCoordinator.java     | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d535591..d44a382 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -71,6 +71,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -582,7 +583,17 @@ public class CheckpointCoordinator {
 
 							return null;
 						},
-						timer));
+						timer)
+					.exceptionally(error -> {
+						if (!isShutdown()) {
+							throw new CompletionException(error);
+						} else if (error instanceof RejectedExecutionException) {
+							LOG.debug("Execution rejected during shutdown");
+						} else {
+							LOG.warn("Error encountered during shutdown", error);
+						}
+						return null;
+					}));
 		} catch (Throwable throwable) {
 			onTriggerFailure(request, throwable);
 		}