You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/29 16:44:42 UTC
[flink] branch release-1.15 updated: [FLINK-26673][changelog] Disable periodic materialization when periodicMaterializeDelay is negative
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new f59404f [FLINK-26673][changelog] Disable periodic materialization when periodicMaterializeDelay is negative
f59404f is described below
commit f59404fe646590abcfbb4a512ea6762f43108646
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Mon Mar 21 11:22:54 2022 +0800
[FLINK-26673][changelog] Disable periodic materialization when periodicMaterializeDelay is negative
---
docs/layouts/shortcodes/generated/state_backend_changelog_section.html | 2 +-
docs/layouts/shortcodes/generated/state_changelog_configuration.html | 2 +-
.../java/org/apache/flink/configuration/StateChangelogOptions.java | 3 ++-
.../apache/flink/state/changelog/PeriodicMaterializationManager.java | 3 ++-
.../test/checkpointing/ChangelogPeriodicMaterializationITCase.java | 2 +-
5 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html b/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
index 405133d..a5b41d3 100644
--- a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
@@ -24,7 +24,7 @@
<td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
- <td>Defines the interval in milliseconds to perform periodic materialization for state backend.</td>
+ <td>Defines the interval in milliseconds to perform periodic materialization for state backend. The periodic materialization will be disabled when the value is negative</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
diff --git a/docs/layouts/shortcodes/generated/state_changelog_configuration.html b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
index 405133d..a5b41d3 100644
--- a/docs/layouts/shortcodes/generated/state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
@@ -24,7 +24,7 @@
<td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
- <td>Defines the interval in milliseconds to perform periodic materialization for state backend.</td>
+ <td>Defines the interval in milliseconds to perform periodic materialization for state backend. The periodic materialization will be disabled when the value is negative</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
index 588e670..ef2fc62 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
@@ -34,7 +34,8 @@ public class StateChangelogOptions {
.defaultValue(Duration.ofMinutes(10))
.withDescription(
"Defines the interval in milliseconds to perform "
- + "periodic materialization for state backend.");
+ + "periodic materialization for state backend. "
+ + "The periodic materialization will be disabled when the value is negative");
@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
public static final ConfigOption<Integer> MATERIALIZATION_MAX_FAILURES_ALLOWED =
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index e20c109..d51d795 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -107,7 +107,8 @@ class PeriodicMaterializationManager implements Closeable {
}
public void start() {
- if (!started) {
+ // disable periodic materialization when periodicMaterializeDelay is negative
+ if (!started && periodicMaterializeDelay >= 0) {
started = true;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
index f8e063b..fb4f875 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
@@ -49,7 +49,7 @@ public class ChangelogPeriodicMaterializationITCase
SharedReference<AtomicBoolean> hasMaterialization =
sharedObjects.add(new AtomicBoolean(true));
StreamExecutionEnvironment env =
- getEnv(delegatedStateBackend, checkpointFolder, 1000, 1, Long.MAX_VALUE, 0);
+ getEnv(delegatedStateBackend, checkpointFolder, 1000, 1, -1, 0);
waitAndAssert(
buildJobGraph(
env,