You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/07 12:17:33 UTC

[flink] branch master updated: [FLINK-26455][state/changelog] Don't fail on materialization cancellation

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b68da57  [FLINK-26455][state/changelog] Don't fail on materialization cancellation
b68da57 is described below

commit b68da579ed0b993d4c6e709205bebc0df4c05e1a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Mar 3 00:57:54 2022 +0100

    [FLINK-26455][state/changelog] Don't fail on materialization cancellation
---
 .../apache/flink/state/changelog/PeriodicMaterializationManager.java | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index 9f13112..daa58d1 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -167,6 +168,10 @@ class PeriodicMaterializationManager implements Closeable {
                                         upTo);
 
                                 scheduleNextMaterialization();
+                            } else if (throwable instanceof CancellationException) {
+                                // can happen e.g. due to task cancellation
+                                LOG.info("materialization cancelled", throwable);
+                                scheduleNextMaterialization();
                             } else {
                                 // if failed
                                 int retryTime = numberOfConsecutiveFailures.incrementAndGet();