You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/23 14:21:59 UTC
[flink] branch release-1.11 updated:
[FLINK-18421][checkpointing][tests] Fix logging of
RejectedExecutionException during CheckpointCoordinator shutdown
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new d89baa3 [FLINK-18421][checkpointing][tests] Fix logging of RejectedExecutionException during CheckpointCoordinator shutdown
d89baa3 is described below
commit d89baa37dd85f99153c3aa39f0989db883c0e798
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jul 22 18:01:17 2020 +0200
[FLINK-18421][checkpointing][tests] Fix logging of RejectedExecutionException during CheckpointCoordinator shutdown
---
.../org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d44a382..76f9efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -78,6 +78,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -587,7 +588,7 @@ public class CheckpointCoordinator {
.exceptionally(error -> {
if (!isShutdown()) {
throw new CompletionException(error);
- } else if (error instanceof RejectedExecutionException) {
+ } else if (findThrowable(error, RejectedExecutionException.class).isPresent()) {
LOG.debug("Execution rejected during shutdown");
} else {
LOG.warn("Error encountered during shutdown", error);
@@ -1745,7 +1746,7 @@ public class CheckpointCoordinator {
CheckpointFailureReason defaultReason, Throwable throwable) {
final Optional<CheckpointException> checkpointExceptionOptional =
- ExceptionUtils.findThrowable(throwable, CheckpointException.class);
+ findThrowable(throwable, CheckpointException.class);
return checkpointExceptionOptional
.orElseGet(() -> new CheckpointException(defaultReason, throwable));
}