You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/07 09:10:31 UTC

flink git commit: [FLINK-9352] Choose initial checkpoint delay randomly to reduce I/O pressure

Repository: flink
Updated Branches:
  refs/heads/master f11352167 -> 9f736d192


[FLINK-9352] Choose initial checkpoint delay randomly to reduce I/O pressure

By choosing the initial checkpoint delay randomly from
[minPauseBetweenCheckpoints, baseInterval] we avoid that multiple restarting jobs
have synchronized checkpoints. This can cause otherwise significant I/O pressure.

This closes #6092.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f736d19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f736d19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f736d19

Branch: refs/heads/master
Commit: 9f736d1927c62d220a82931c4f5ffa4955910f27
Parents: f113521
Author: yanghua <ya...@gmail.com>
Authored: Tue May 29 15:59:48 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Jul 7 11:10:08 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java      | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f736d19/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 55e1ffe..803b2ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -133,7 +134,7 @@ public class CheckpointCoordinator {
 	/** The max time (in ms) that a checkpoint may take */
 	private final long checkpointTimeout;
 
-	/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
+	/** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
 	 * enforce minimum processing time between checkpoint attempts */
 	private final long minPauseBetweenCheckpointsNanos;
 
@@ -1173,9 +1174,10 @@ public class CheckpointCoordinator {
 			stopCheckpointScheduler();
 
 			periodicScheduling = true;
+			long initialDelay = ThreadLocalRandom.current().nextLong(
+				minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
 			currentPeriodicTrigger = timer.scheduleAtFixedRate(
-					new ScheduledTrigger(),
-					baseInterval, baseInterval, TimeUnit.MILLISECONDS);
+					new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
 		}
 	}