You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/11 18:37:01 UTC
flink git commit: [FLINK-3492] Configurable interval between
Checkpoints
Repository: flink
Updated Branches:
refs/heads/master ef7f9ac9a -> 2e63d1afb
[FLINK-3492] Configurable interval between Checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e63d1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e63d1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e63d1af
Branch: refs/heads/master
Commit: 2e63d1afb2358760109ba0c90011ef565eaae0ff
Parents: ef7f9ac
Author: zentol <ch...@apache.org>
Authored: Thu Mar 17 12:27:29 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 11 18:36:21 2016 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 14 +++
.../checkpoint/CheckpointCoordinatorTest.java | 89 +++++++++++++++++++-
.../api/environment/CheckpointConfig.java | 34 ++++----
3 files changed, 117 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d1af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index edeab6a..8a856bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -127,6 +127,8 @@ public class CheckpointCoordinator {
private ScheduledTrigger currentPeriodicTrigger;
+ private long lastTriggeredCheckpoint;
+
/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling;
@@ -370,6 +372,17 @@ public class CheckpointCoordinator {
}
return false;
}
+
+ //make sure the minimum interval between checkpoints has passed
+ if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger = null;
+ }
+ ScheduledTrigger trigger = new ScheduledTrigger();
+ timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
+ return false;
+ }
}
// first check if all tasks that we need to trigger are running.
@@ -403,6 +416,7 @@ public class CheckpointCoordinator {
// we will actually trigger this checkpoint!
+ lastTriggeredCheckpoint = timestamp;
final long checkpointID;
if (nextCheckpointId < 0) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d1af/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d385d73..266cf52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -30,9 +30,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -43,7 +41,12 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
@@ -1027,6 +1030,86 @@ public class CheckpointCoordinatorTest {
}
}
+ /**
+ * This test verified that after a completed checkpoint a certain time has passed before
+ * another is triggered.
+ */
+ @Test
+ public void testMinInterval() {
+ try {
+ final JobID jid = new JobID();
+
+ // create some mock execution vertices and trigger some checkpoint
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+ final AtomicInteger numCalls = new AtomicInteger();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ if (invocation.getArguments()[0] instanceof TriggerCheckpoint) {
+ numCalls.incrementAndGet();
+ }
+ return null;
+ }
+ }).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 10, // periodic interval is 10 ms
+ 200000, // timeout is very long (200 s)
+ 500, // 500ms delay between checkpoints
+ 10,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 }, cl, new StandaloneCheckpointIDCounter
+ (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
+ new DisabledCheckpointStatsTracker());
+
+ coord.startCheckpointScheduler();
+
+ //wait until the first checkpoint was triggered
+ for (int x=0; x<20; x++) {
+ Thread.sleep(100);
+ if (numCalls.get() > 0) {
+ break;
+ }
+ }
+
+ if (numCalls.get() == 0) {
+ fail("No checkpoint was triggered within the first 2000 ms.");
+ }
+
+ long start = System.currentTimeMillis();
+
+ for (int x = 0; x < 20; x++) {
+ Thread.sleep(100);
+ int triggeredCheckpoints = numCalls.get();
+ long curT = System.currentTimeMillis();
+
+ /**
+ * Within a given time-frame T only T/500 checkpoints may be triggered due to the configured minimum
+ * interval between checkpoints. This value however does not not take the first triggered checkpoint
+ * into account (=> +1). Furthermore we have to account for the mis-alignment between checkpoints
+ * being triggered and our time measurement (=> +1); for T=1200 a total of 3-4 checkpoints may have been
+ * triggered depending on whether the end of the minimum interval for the first checkpoints ends before
+ * or after T=200.
+ */
+ long maxAllowedCheckpoints = (curT - start) / 500 + 2;
+ assertTrue(maxAllowedCheckpoints >= triggeredCheckpoints);
+ }
+
+ coord.stopCheckpointScheduler();
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void testMaxConcurrentAttempts1() {
testMaxConcurrentAttemps(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d1af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 327b524..cdeef94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -153,23 +153,23 @@ public class CheckpointConfig implements java.io.Serializable {
return minPauseBetweenCheckpoints;
}
-// /**
-// * Sets the minimal pause between checkpointing attempts. This setting defines how soon the
-// * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
-// * another checkpoint with respect to the maximum number of concurrent checkpoints
-// * (see {@link #setMaxConcurrentCheckpoints(int)}).
-// *
-// * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure
-// * that a minimum amount of time passes where no checkpoint is in progress at all.
-// *
-// * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered.
-// */
-// public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) {
-// if (minPauseBetweenCheckpoints < 0) {
-// throw new IllegalArgumentException("Pause value must be zero or positive");
-// }
-// this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
-// }
+ /**
+ * Sets the minimal pause between checkpointing attempts. This setting defines how soon the
+ * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
+ * another checkpoint with respect to the maximum number of concurrent checkpoints
+ * (see {@link #setMaxConcurrentCheckpoints(int)}).
+ *
+ * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure
+ * that a minimum amount of time passes where no checkpoint is in progress at all.
+ *
+ * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered.
+ */
+ public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) {
+ if (minPauseBetweenCheckpoints < 0) {
+ throw new IllegalArgumentException("Pause value must be zero or positive");
+ }
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ }
/**
* Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this