You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/29 16:44:42 UTC

[flink] branch release-1.15 updated: [FLINK-26673][changelog] Disable periodic materialization when periodicMaterializeDelay is negative

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new f59404f  [FLINK-26673][changelog] Disable periodic materialization when periodicMaterializeDelay is negative
f59404f is described below

commit f59404fe646590abcfbb4a512ea6762f43108646
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Mon Mar 21 11:22:54 2022 +0800

    [FLINK-26673][changelog] Disable periodic materialization when periodicMaterializeDelay is negative
---
 docs/layouts/shortcodes/generated/state_backend_changelog_section.html | 2 +-
 docs/layouts/shortcodes/generated/state_changelog_configuration.html   | 2 +-
 .../java/org/apache/flink/configuration/StateChangelogOptions.java     | 3 ++-
 .../apache/flink/state/changelog/PeriodicMaterializationManager.java   | 3 ++-
 .../test/checkpointing/ChangelogPeriodicMaterializationITCase.java     | 2 +-
 5 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html b/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
index 405133d..a5b41d3 100644
--- a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
@@ -24,7 +24,7 @@
             <td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
             <td style="word-wrap: break-word;">10 min</td>
             <td>Duration</td>
-            <td>Defines the interval in milliseconds to perform periodic materialization for state backend.</td>
+            <td>Defines the interval in milliseconds to perform periodic materialization for state backend. The periodic materialization will be disabled when the value is negative</td>
         </tr>
         <tr>
             <td><h5>state.backend.changelog.storage</h5></td>
diff --git a/docs/layouts/shortcodes/generated/state_changelog_configuration.html b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
index 405133d..a5b41d3 100644
--- a/docs/layouts/shortcodes/generated/state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
             <td style="word-wrap: break-word;">10 min</td>
             <td>Duration</td>
-            <td>Defines the interval in milliseconds to perform periodic materialization for state backend.</td>
+            <td>Defines the interval in milliseconds to perform periodic materialization for state backend. The periodic materialization will be disabled when the value is negative</td>
         </tr>
         <tr>
             <td><h5>state.backend.changelog.storage</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
index 588e670..ef2fc62 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
@@ -34,7 +34,8 @@ public class StateChangelogOptions {
                     .defaultValue(Duration.ofMinutes(10))
                     .withDescription(
                             "Defines the interval in milliseconds to perform "
-                                    + "periodic materialization for state backend.");
+                                    + "periodic materialization for state backend. "
+                                    + "The periodic materialization will be disabled when the value is negative");
 
     @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
     public static final ConfigOption<Integer> MATERIALIZATION_MAX_FAILURES_ALLOWED =
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index e20c109..d51d795 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -107,7 +107,8 @@ class PeriodicMaterializationManager implements Closeable {
     }
 
     public void start() {
-        if (!started) {
+        // disable periodic materialization when periodicMaterializeDelay is negative
+        if (!started && periodicMaterializeDelay >= 0) {
 
             started = true;
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
index f8e063b..fb4f875 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
@@ -49,7 +49,7 @@ public class ChangelogPeriodicMaterializationITCase
         SharedReference<AtomicBoolean> hasMaterialization =
                 sharedObjects.add(new AtomicBoolean(true));
         StreamExecutionEnvironment env =
-                getEnv(delegatedStateBackend, checkpointFolder, 1000, 1, Long.MAX_VALUE, 0);
+                getEnv(delegatedStateBackend, checkpointFolder, 1000, 1, -1, 0);
         waitAndAssert(
                 buildJobGraph(
                         env,