You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/23 14:21:59 UTC

[flink] branch release-1.11 updated: [FLINK-18421][checkpointing][tests] Fix logging of RejectedExecutionException during CheckpointCoordinator shutdown

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d89baa3  [FLINK-18421][checkpointing][tests] Fix logging of RejectedExecutionException during CheckpointCoordinator shutdown
d89baa3 is described below

commit d89baa37dd85f99153c3aa39f0989db883c0e798
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jul 22 18:01:17 2020 +0200

    [FLINK-18421][checkpointing][tests] Fix logging of RejectedExecutionException during CheckpointCoordinator shutdown
---
 .../org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d44a382..76f9efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -78,6 +78,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -587,7 +588,7 @@ public class CheckpointCoordinator {
 					.exceptionally(error -> {
 						if (!isShutdown()) {
 							throw new CompletionException(error);
-						} else if (error instanceof RejectedExecutionException) {
+						} else if (findThrowable(error, RejectedExecutionException.class).isPresent()) {
 							LOG.debug("Execution rejected during shutdown");
 						} else {
 							LOG.warn("Error encountered during shutdown", error);
@@ -1745,7 +1746,7 @@ public class CheckpointCoordinator {
 		CheckpointFailureReason defaultReason, Throwable throwable) {
 
 		final Optional<CheckpointException> checkpointExceptionOptional =
-			ExceptionUtils.findThrowable(throwable, CheckpointException.class);
+			findThrowable(throwable, CheckpointException.class);
 		return checkpointExceptionOptional
 			.orElseGet(() -> new CheckpointException(defaultReason, throwable));
 	}