You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/07 12:17:33 UTC
[flink] branch master updated: [FLINK-26455][state/changelog] Don't fail on materialization cancellation
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b68da57 [FLINK-26455][state/changelog] Don't fail on materialization cancellation
b68da57 is described below
commit b68da579ed0b993d4c6e709205bebc0df4c05e1a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Mar 3 00:57:54 2022 +0100
[FLINK-26455][state/changelog] Don't fail on materialization cancellation
---
.../apache/flink/state/changelog/PeriodicMaterializationManager.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index 9f13112..daa58d1 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -167,6 +168,10 @@ class PeriodicMaterializationManager implements Closeable {
upTo);
scheduleNextMaterialization();
+ } else if (throwable instanceof CancellationException) {
+ // can happen e.g. due to task cancellation
+ LOG.info("materialization cancelled", throwable);
+ scheduleNextMaterialization();
} else {
// if failed
int retryTime = numberOfConsecutiveFailures.incrementAndGet();