You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/08/26 11:57:36 UTC

[flink] 02/03: [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 605a7118a018a716b7715834f5b3f63f3e7064af
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Aug 15 19:29:58 2022 +0200

    [hotfix][state] Fix logging in Materializer and make FLINK-28976 more explicit
    
    (cherry picked from commit 0e083c119b24a9b5fa7af1ad274c9ee7111c07cc)
---
 .../state/changelog/PeriodicMaterializationManager.java     | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index d51d7958cd3..66c75c670e9 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -114,7 +114,7 @@ class PeriodicMaterializationManager implements Closeable {
 
             LOG.info("Task {} starts periodic materialization", subtaskName);
 
-            scheduleNextMaterialization(initialDelay);
+            scheduleNextMaterialization(periodicMaterializeDelay + initialDelay);
         }
     }
 
@@ -255,22 +255,19 @@ class PeriodicMaterializationManager implements Closeable {
     }
 
     private void scheduleNextMaterialization() {
-        scheduleNextMaterialization(0);
+        scheduleNextMaterialization(periodicMaterializeDelay);
     }
 
     // task thread and asyncOperationsThreadPool can access this method
-    private synchronized void scheduleNextMaterialization(long offset) {
+    private synchronized void scheduleNextMaterialization(long delay) {
         if (started && !periodicExecutor.isShutdown()) {
 
             LOG.info(
                     "Task {} schedules the next materialization in {} seconds",
                     subtaskName,
-                    periodicMaterializeDelay / 1000);
+                    delay / 1000);
 
-            periodicExecutor.schedule(
-                    this::triggerMaterialization,
-                    periodicMaterializeDelay + offset,
-                    TimeUnit.MILLISECONDS);
+            periodicExecutor.schedule(this::triggerMaterialization, delay, TimeUnit.MILLISECONDS);
         }
     }