You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/07 09:10:31 UTC
flink git commit: [FLINK-9352] Choose initial checkpoint delay
randomly to reduce I/O pressure
Repository: flink
Updated Branches:
refs/heads/master f11352167 -> 9f736d192
[FLINK-9352] Choose initial checkpoint delay randomly to reduce I/O pressure
By choosing the initial checkpoint delay randomly from
[minPauseBetweenCheckpoints, baseInterval] we avoid that multiple restarting jobs
have synchronized checkpoints. This can cause otherwise significant I/O pressure.
This closes #6092.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f736d19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f736d19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f736d19
Branch: refs/heads/master
Commit: 9f736d1927c62d220a82931c4f5ffa4955910f27
Parents: f113521
Author: yanghua <ya...@gmail.com>
Authored: Tue May 29 15:59:48 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Jul 7 11:10:08 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/checkpoint/CheckpointCoordinator.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f736d19/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 55e1ffe..803b2ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -133,7 +134,7 @@ public class CheckpointCoordinator {
/** The max time (in ms) that a checkpoint may take */
private final long checkpointTimeout;
- /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
+ /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
private final long minPauseBetweenCheckpointsNanos;
@@ -1173,9 +1174,10 @@ public class CheckpointCoordinator {
stopCheckpointScheduler();
periodicScheduling = true;
+ long initialDelay = ThreadLocalRandom.current().nextLong(
+ minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
currentPeriodicTrigger = timer.scheduleAtFixedRate(
- new ScheduledTrigger(),
- baseInterval, baseInterval, TimeUnit.MILLISECONDS);
+ new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
}
}