You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/10 15:05:28 UTC

[flink] branch master updated: [FLINK-26306][state/changelog] Randomly offset materialization

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ed5e614  [FLINK-26306][state/changelog] Randomly offset materialization
ed5e614 is described below

commit ed5e6144441bfbc020f525f9c10fd29cb3d83cbf
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Mar 1 23:49:59 2022 +0100

    [FLINK-26306][state/changelog] Randomly offset materialization
---
 .../state/changelog/ChangelogStateBackend.java     |  3 ++-
 .../changelog/PeriodicMaterializationManager.java  | 22 ++++++++++++++++++----
 .../changelog/ChangelogStateBackendTestUtils.java  |  3 ++-
 3 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index a1a2016..863a533 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -247,7 +247,8 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab
                                 env.failExternally(new AsynchronousException(message, exception)),
                         keyedStateBackend,
                         executionConfig.getPeriodicMaterializeIntervalMillis(),
-                        executionConfig.getMaterializationMaxAllowedFailures());
+                        executionConfig.getMaterializationMaxAllowedFailures(),
+                        operatorIdentifier);
 
         // keyedStateBackend is responsible to close periodicMaterializationManager
         // This indicates periodicMaterializationManager binds to the keyedStateBackend
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index daa58d1..e20c109 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -74,6 +75,8 @@ class PeriodicMaterializationManager implements Closeable {
     /** Whether PeriodicMaterializationManager is started. */
     private boolean started = false;
 
+    private final long initialDelay;
+
     PeriodicMaterializationManager(
             MailboxExecutor mailboxExecutor,
             ExecutorService asyncOperationsThreadPool,
@@ -81,7 +84,8 @@ class PeriodicMaterializationManager implements Closeable {
             AsyncExceptionHandler asyncExceptionHandler,
             ChangelogKeyedStateBackend<?> keyedStateBackend,
             long periodicMaterializeDelay,
-            int allowedNumberOfFailures) {
+            int allowedNumberOfFailures,
+            String operatorSubtaskId) {
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
         this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool);
         this.subtaskName = checkNotNull(subtaskName);
@@ -96,6 +100,10 @@ class PeriodicMaterializationManager implements Closeable {
                 Executors.newSingleThreadScheduledExecutor(
                         new ExecutorThreadFactory(
                                 "periodic-materialization-scheduler-" + subtaskName));
+
+        this.initialDelay =
+                // randomize initial delay to avoid thundering herd problem
+                MathUtils.murmurHash(operatorSubtaskId.hashCode()) % periodicMaterializeDelay;
     }
 
     public void start() {
@@ -105,7 +113,7 @@ class PeriodicMaterializationManager implements Closeable {
 
             LOG.info("Task {} starts periodic materialization", subtaskName);
 
-            scheduleNextMaterialization();
+            scheduleNextMaterialization(initialDelay);
         }
     }
 
@@ -245,8 +253,12 @@ class PeriodicMaterializationManager implements Closeable {
         }
     }
 
+    private void scheduleNextMaterialization() {
+        scheduleNextMaterialization(0);
+    }
+
     // task thread and asyncOperationsThreadPool can access this method
-    private synchronized void scheduleNextMaterialization() {
+    private synchronized void scheduleNextMaterialization(long offset) {
         if (started && !periodicExecutor.isShutdown()) {
 
             LOG.info(
@@ -255,7 +267,9 @@ class PeriodicMaterializationManager implements Closeable {
                     periodicMaterializeDelay / 1000);
 
             periodicExecutor.schedule(
-                    this::triggerMaterialization, periodicMaterializeDelay, TimeUnit.MILLISECONDS);
+                    this::triggerMaterialization,
+                    periodicMaterializeDelay + offset,
+                    TimeUnit.MILLISECONDS);
         }
     }
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index cf71b3e..942a229 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -354,7 +354,8 @@ public class ChangelogStateBackendTestUtils {
                 (message, exception) -> asyncComplete.completeExceptionally(exception),
                 keyedBackend,
                 10,
-                1);
+                1,
+                "testTask");
     }
 
     /** Dummy {@link CheckpointStorageAccess}. */