You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/10 15:05:28 UTC
[flink] branch master updated: [FLINK-26306][state/changelog] Randomly offset materialization
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ed5e614 [FLINK-26306][state/changelog] Randomly offset materialization
ed5e614 is described below
commit ed5e6144441bfbc020f525f9c10fd29cb3d83cbf
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Mar 1 23:49:59 2022 +0100
[FLINK-26306][state/changelog] Randomly offset materialization
---
.../state/changelog/ChangelogStateBackend.java | 3 ++-
.../changelog/PeriodicMaterializationManager.java | 22 ++++++++++++++++++----
.../changelog/ChangelogStateBackendTestUtils.java | 3 ++-
3 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index a1a2016..863a533 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -247,7 +247,8 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab
env.failExternally(new AsynchronousException(message, exception)),
keyedStateBackend,
executionConfig.getPeriodicMaterializeIntervalMillis(),
- executionConfig.getMaterializationMaxAllowedFailures());
+ executionConfig.getMaterializationMaxAllowedFailures(),
+ operatorIdentifier);
// keyedStateBackend is responsible to close periodicMaterializationManager
// This indicates periodicMaterializationManager binds to the keyedStateBackend
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
index daa58d1..e20c109 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.MathUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -74,6 +75,8 @@ class PeriodicMaterializationManager implements Closeable {
/** Whether PeriodicMaterializationManager is started. */
private boolean started = false;
+ private final long initialDelay;
+
PeriodicMaterializationManager(
MailboxExecutor mailboxExecutor,
ExecutorService asyncOperationsThreadPool,
@@ -81,7 +84,8 @@ class PeriodicMaterializationManager implements Closeable {
AsyncExceptionHandler asyncExceptionHandler,
ChangelogKeyedStateBackend<?> keyedStateBackend,
long periodicMaterializeDelay,
- int allowedNumberOfFailures) {
+ int allowedNumberOfFailures,
+ String operatorSubtaskId) {
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool);
this.subtaskName = checkNotNull(subtaskName);
@@ -96,6 +100,10 @@ class PeriodicMaterializationManager implements Closeable {
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(
"periodic-materialization-scheduler-" + subtaskName));
+
+ this.initialDelay =
+ // randomize initial delay to avoid thundering herd problem
+ MathUtils.murmurHash(operatorSubtaskId.hashCode()) % periodicMaterializeDelay;
}
public void start() {
@@ -105,7 +113,7 @@ class PeriodicMaterializationManager implements Closeable {
LOG.info("Task {} starts periodic materialization", subtaskName);
- scheduleNextMaterialization();
+ scheduleNextMaterialization(initialDelay);
}
}
@@ -245,8 +253,12 @@ class PeriodicMaterializationManager implements Closeable {
}
}
+ private void scheduleNextMaterialization() {
+ scheduleNextMaterialization(0);
+ }
+
// task thread and asyncOperationsThreadPool can access this method
- private synchronized void scheduleNextMaterialization() {
+ private synchronized void scheduleNextMaterialization(long offset) {
if (started && !periodicExecutor.isShutdown()) {
LOG.info(
@@ -255,7 +267,9 @@ class PeriodicMaterializationManager implements Closeable {
periodicMaterializeDelay / 1000);
periodicExecutor.schedule(
- this::triggerMaterialization, periodicMaterializeDelay, TimeUnit.MILLISECONDS);
+ this::triggerMaterialization,
+ periodicMaterializeDelay + offset,
+ TimeUnit.MILLISECONDS);
}
}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index cf71b3e..942a229 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -354,7 +354,8 @@ public class ChangelogStateBackendTestUtils {
(message, exception) -> asyncComplete.completeExceptionally(exception),
keyedBackend,
10,
- 1);
+ 1,
+ "testTask");
}
/** Dummy {@link CheckpointStorageAccess}. */