You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/01 16:38:10 UTC
flink git commit: [FLINK-5216] [checkpoints] 'Min Time Between
Checkpoints' references timestamp after checkpoint
Repository: flink
Updated Branches:
refs/heads/master 7d66aaeb0 -> b181662be
[FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp after checkpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b181662b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b181662b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b181662b
Branch: refs/heads/master
Commit: b181662be378652d6c4405841ccda6145968d145
Parents: 7d66aae
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 20:31:07 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:55:27 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 45 ++++----
.../checkpoint/CheckpointCoordinatorTest.java | 108 +++++++++----------
2 files changed, 74 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b181662b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2242c14..8ca4b2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,7 @@ public class CheckpointCoordinator {
/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
- private final long minPauseBetweenCheckpoints;
+ private final long minPauseBetweenCheckpointsNanos;
/** The maximum number of checkpoints that may be in progress at the same time */
private final int maxConcurrentCheckpointAttempts;
@@ -133,7 +133,8 @@ public class CheckpointCoordinator {
private ScheduledTrigger currentPeriodicTrigger;
- private long lastTriggeredCheckpoint;
+ /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
+ private long lastCheckpointCompletionNanos;
/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
@@ -184,6 +185,11 @@ public class CheckpointCoordinator {
"configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
}
+ // max "in between duration" can be one year - this is to prevent numeric overflows
+ if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+ minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
+ }
+
// it does not make sense to schedule checkpoints more often then the desired
// time between checkpoints
if (baseInterval < minPauseBetweenCheckpoints) {
@@ -193,7 +199,7 @@ public class CheckpointCoordinator {
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = checkpointTimeout;
- this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -352,13 +358,10 @@ public class CheckpointCoordinator {
}
// make sure the minimum interval between checkpoints has passed
- long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
- if (nextCheckpointEarliest < 0) {
- // overflow
- nextCheckpointEarliest = Long.MAX_VALUE;
- }
+ final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+ final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
- if (nextCheckpointEarliest > timestamp) {
+ if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel();
currentPeriodicTrigger = null;
@@ -366,8 +369,7 @@ public class CheckpointCoordinator {
ScheduledTrigger trigger = new ScheduledTrigger();
// Reassign the new trigger to the currentPeriodicTrigger
currentPeriodicTrigger = trigger;
- long delay = nextCheckpointEarliest - timestamp;
- timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+ timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
@@ -475,29 +477,25 @@ public class CheckpointCoordinator {
}
// make sure the minimum interval between checkpoints has passed
- long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
- if (nextCheckpointEarliest < 0) {
- // overflow
- nextCheckpointEarliest = Long.MAX_VALUE;
- }
+ final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+ final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
- if (nextCheckpointEarliest > timestamp) {
+ if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel();
currentPeriodicTrigger = null;
}
+
ScheduledTrigger trigger = new ScheduledTrigger();
// Reassign the new trigger to the currentPeriodicTrigger
currentPeriodicTrigger = trigger;
- long delay = nextCheckpointEarliest - timestamp;
- timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+ timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
- lastTriggeredCheckpoint = Math.max(timestamp, lastTriggeredCheckpoint);
pendingCheckpoints.put(checkpointID, checkpoint);
timer.schedule(canceller, checkpointTimeout);
}
@@ -644,8 +642,13 @@ public class CheckpointCoordinator {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
case SUCCESS:
if (checkpoint.isFullyAcknowledged()) {
- completed = checkpoint.finalizeCheckpoint();
+ // record the time when this was completed, to calculate
+ // the 'min delay between checkpoints'
+ lastCheckpointCompletionNanos = System.nanoTime();
+
+ // complete the checkpoint structure
+ completed = checkpoint.finalizeCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
LOG.info("Completed checkpoint " + checkpointId + " (in " +
http://git-wip-us.apache.org/repos/asf/flink/blob/b181662b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 8e46f4c..73eebcf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -70,7 +70,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -1229,82 +1231,72 @@ public class CheckpointCoordinatorTest {
* another is triggered.
*/
@Test
- public void testMinInterval() {
- try {
- final JobID jid = new JobID();
+ public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+ final JobID jid = new JobID();
- // create some mock execution vertices and trigger some checkpoint
- final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
- ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+ // create some mock execution vertices and trigger some checkpoint
+ final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+ final ExecutionVertex vertex = mockExecutionVertex(attemptID);
+ final Execution executionAttempt = vertex.getCurrentExecutionAttempt();
- final AtomicInteger numCalls = new AtomicInteger();
- final Execution execution = vertex1.getCurrentExecutionAttempt();
+ final BlockingQueue<Long> triggerCalls = new LinkedBlockingQueue<>();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- numCalls.incrementAndGet();
- return null;
- }
- }).when(execution).triggerCheckpoint(anyLong(), anyLong());
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ triggerCalls.add((Long) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong());
- CheckpointCoordinator coord = new CheckpointCoordinator(
+ final long delay = 50;
+
+ final CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
- 10, // periodic interval is 10 ms
- 200000, // timeout is very long (200 s)
- 500, // 500ms delay between checkpoints
- 10,
+ 2, // periodic interval is 2 ms
+ 200_000, // timeout is very long (200 s)
+ delay, // 50 ms delay between checkpoints
+ 1,
ExternalizedCheckpointSettings.none(),
- new ExecutionVertex[] { vertex1 },
- new ExecutionVertex[] { vertex1 },
- new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
+ "dummy-path",
new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
+ try {
coord.startCheckpointScheduler();
- //wait until the first checkpoint was triggered
- for (int x=0; x<20; x++) {
- Thread.sleep(100);
- if (numCalls.get() > 0) {
- break;
- }
- }
+ // wait until the first checkpoint was triggered
+ Long firstCallId = triggerCalls.take();
+ assertEquals(1L, firstCallId.longValue());
- if (numCalls.get() == 0) {
- fail("No checkpoint was triggered within the first 2000 ms.");
- }
-
- long start = System.currentTimeMillis();
-
- for (int x = 0; x < 20; x++) {
- Thread.sleep(100);
- int triggeredCheckpoints = numCalls.get();
- long curT = System.currentTimeMillis();
-
- /**
- * Within a given time-frame T only T/500 checkpoints may be triggered due to the configured minimum
- * interval between checkpoints. This value however does not not take the first triggered checkpoint
- * into account (=> +1). Furthermore we have to account for the mis-alignment between checkpoints
- * being triggered and our time measurement (=> +1); for T=1200 a total of 3-4 checkpoints may have been
- * triggered depending on whether the end of the minimum interval for the first checkpoints ends before
- * or after T=200.
- */
- long maxAllowedCheckpoints = (curT - start) / 500 + 2;
- assertTrue(maxAllowedCheckpoints >= triggeredCheckpoints);
- }
+ AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(
+ jid, attemptID, new CheckpointMetaData(1L, System.currentTimeMillis()));
- coord.stopCheckpointScheduler();
+ // tell the coordinator that the checkpoint is done
+ final long ackTime = System.nanoTime();
+ coord.receiveAcknowledgeMessage(ackMsg);
+
+ // wait until the next checkpoint is triggered
+ Long nextCallId = triggerCalls.take();
+ final long nextCheckpointTime = System.nanoTime();
+ assertEquals(2L, nextCallId.longValue());
+
+ final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
+ // we need to add one ms here to account for rounding errors
+ if (delayMillis + 1 < delay) {
+ fail("checkpoint came too early: delay was " + delayMillis + " but should have been at least " + delay);
+ }
+ }
+ finally {
+ coord.stopCheckpointScheduler();
coord.shutdown(JobStatus.FINISHED);
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
}
@Test