You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/08/26 11:57:36 UTC
[flink] 02/03: [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 605a7118a018a716b7715834f5b3f63f3e7064af
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Aug 15 19:29:58 2022 +0200
[hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit
(cherry picked from commit 0e083c119b24a9b5fa7af1ad274c9ee7111c07cc)
---
.../state/changelog/PeriodicMaterializationManager.java | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index d51d7958cd3..66c75c670e9 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -114,7 +114,7 @@ class PeriodicMaterializationManager implements Closeable {
LOG.info("Task {} starts periodic materialization", subtaskName);
- scheduleNextMaterialization(initialDelay);
+ scheduleNextMaterialization(periodicMaterializeDelay + initialDelay);
}
}
@@ -255,22 +255,19 @@ class PeriodicMaterializationManager implements Closeable {
}
private void scheduleNextMaterialization() {
- scheduleNextMaterialization(0);
+ scheduleNextMaterialization(periodicMaterializeDelay);
}
// task thread and asyncOperationsThreadPool can access this method
- private synchronized void scheduleNextMaterialization(long offset) {
+ private synchronized void scheduleNextMaterialization(long delay) {
if (started && !periodicExecutor.isShutdown()) {
LOG.info(
"Task {} schedules the next materialization in {} seconds",
subtaskName,
- periodicMaterializeDelay / 1000);
+ delay / 1000);
- periodicExecutor.schedule(
- this::triggerMaterialization,
- periodicMaterializeDelay + offset,
- TimeUnit.MILLISECONDS);
+ periodicExecutor.schedule(this::triggerMaterialization, delay, TimeUnit.MILLISECONDS);
}
}