You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/01 16:38:10 UTC

flink git commit: [FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp after checkpoint

Repository: flink
Updated Branches:
  refs/heads/master 7d66aaeb0 -> b181662be


[FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp after checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b181662b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b181662b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b181662b

Branch: refs/heads/master
Commit: b181662be378652d6c4405841ccda6145968d145
Parents: 7d66aae
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 20:31:07 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:55:27 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  45 ++++----
 .../checkpoint/CheckpointCoordinatorTest.java   | 108 +++++++++----------
 2 files changed, 74 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b181662b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2242c14..8ca4b2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,7 @@ public class CheckpointCoordinator {
 
 	/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
 	 * enforce minimum processing time between checkpoint attempts */
-	private final long minPauseBetweenCheckpoints;
+	private final long minPauseBetweenCheckpointsNanos;
 
 	/** The maximum number of checkpoints that may be in progress at the same time */
 	private final int maxConcurrentCheckpointAttempts;
@@ -133,7 +133,8 @@ public class CheckpointCoordinator {
 
 	private ScheduledTrigger currentPeriodicTrigger;
 
-	private long lastTriggeredCheckpoint;
+	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
+	private long lastCheckpointCompletionNanos;
 
 	/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
 	 * Non-volatile, because only accessed in synchronized scope */
@@ -184,6 +185,11 @@ public class CheckpointCoordinator {
 					"configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
 		}
 
+		// max "in between duration" can be one year - this is to prevent numeric overflows
+		if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+			minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
+		}
+
 		// it does not make sense to schedule checkpoints more often then the desired
 		// time between checkpoints
 		if (baseInterval < minPauseBetweenCheckpoints) {
@@ -193,7 +199,7 @@ public class CheckpointCoordinator {
 		this.job = checkNotNull(job);
 		this.baseInterval = baseInterval;
 		this.checkpointTimeout = checkpointTimeout;
-		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
 		this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -352,13 +358,10 @@ public class CheckpointCoordinator {
 				}
 
 				// make sure the minimum interval between checkpoints has passed
-				long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
-				if (nextCheckpointEarliest < 0) {
-					// overflow
-					nextCheckpointEarliest = Long.MAX_VALUE;
-				}
+				final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+				final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
 
-				if (nextCheckpointEarliest > timestamp) {
+				if (durationTillNextMillis > 0) {
 					if (currentPeriodicTrigger != null) {
 						currentPeriodicTrigger.cancel();
 						currentPeriodicTrigger = null;
@@ -366,8 +369,7 @@ public class CheckpointCoordinator {
 					ScheduledTrigger trigger = new ScheduledTrigger();
 					// Reassign the new trigger to the currentPeriodicTrigger
 					currentPeriodicTrigger = trigger;
-					long delay = nextCheckpointEarliest - timestamp;
-					timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+					timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
 					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
 			}
@@ -475,29 +477,25 @@ public class CheckpointCoordinator {
 						}
 
 						// make sure the minimum interval between checkpoints has passed
-						long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
-						if (nextCheckpointEarliest < 0) {
-							// overflow
-							nextCheckpointEarliest = Long.MAX_VALUE;
-						}
+						final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+						final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
 
-						if (nextCheckpointEarliest > timestamp) {
+						if (durationTillNextMillis > 0) {
 							if (currentPeriodicTrigger != null) {
 								currentPeriodicTrigger.cancel();
 								currentPeriodicTrigger = null;
 							}
+
 							ScheduledTrigger trigger = new ScheduledTrigger();
 							// Reassign the new trigger to the currentPeriodicTrigger
 							currentPeriodicTrigger = trigger;
-							long delay = nextCheckpointEarliest - timestamp;
-							timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+							timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
 							return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 						}
 					}
 
 					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
 
-					lastTriggeredCheckpoint = Math.max(timestamp, lastTriggeredCheckpoint);
 					pendingCheckpoints.put(checkpointID, checkpoint);
 					timer.schedule(canceller, checkpointTimeout);
 				}
@@ -644,8 +642,13 @@ public class CheckpointCoordinator {
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
 					case SUCCESS:
 						if (checkpoint.isFullyAcknowledged()) {
-							completed = checkpoint.finalizeCheckpoint();
 
+							// record the time when this was completed, to calculate
+							// the 'min delay between checkpoints'
+							lastCheckpointCompletionNanos = System.nanoTime();
+
+							// complete the checkpoint structure
+							completed = checkpoint.finalizeCheckpoint();
 							completedCheckpointStore.addCheckpoint(completed);
 
 							LOG.info("Completed checkpoint " + checkpointId + " (in " +

http://git-wip-us.apache.org/repos/asf/flink/blob/b181662b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 8e46f4c..73eebcf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -70,7 +70,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -1229,82 +1231,72 @@ public class CheckpointCoordinatorTest {
 	 * another is triggered.
 	 */
 	@Test
-	public void testMinInterval() {
-		try {
-			final JobID jid = new JobID();
+	public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+		final JobID jid = new JobID();
 
-			// create some mock execution vertices and trigger some checkpoint
-			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
-			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+		// create some mock execution vertices and trigger some checkpoint
+		final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+		final ExecutionVertex vertex = mockExecutionVertex(attemptID);
+		final Execution executionAttempt = vertex.getCurrentExecutionAttempt();
 
-			final AtomicInteger numCalls = new AtomicInteger();
-			final Execution execution = vertex1.getCurrentExecutionAttempt();
+		final BlockingQueue<Long> triggerCalls = new LinkedBlockingQueue<>();
 
-			doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocation) throws Throwable {
-					numCalls.incrementAndGet();
-					return null;
-				}
-			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				triggerCalls.add((Long) invocation.getArguments()[0]);
+				return null;
+			}
+		}).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong());
 
-			CheckpointCoordinator coord = new CheckpointCoordinator(
+		final long delay = 50;
+
+		final CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
-				10,        // periodic interval is 10 ms
-				200000,    // timeout is very long (200 s)
-				500,    // 500ms delay between checkpoints
-				10,
+				2,           // periodic interval is 2 ms
+				200_000,     // timeout is very long (200 s)
+				delay,       // 50 ms delay between checkpoints
+				1,
 				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex },
+				new ExecutionVertex[] { vertex },
+				new ExecutionVertex[] { vertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
+				"dummy-path",
 				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
+		try {
 			coord.startCheckpointScheduler();
 
-			//wait until the first checkpoint was triggered
-			for (int x=0; x<20; x++) {
-				Thread.sleep(100);
-				if (numCalls.get() > 0) {
-					break;
-				}
-			}
+			// wait until the first checkpoint was triggered
+			Long firstCallId = triggerCalls.take();
+			assertEquals(1L, firstCallId.longValue());
 
-			if (numCalls.get() == 0) {
-				fail("No checkpoint was triggered within the first 2000 ms.");
-			}
-			
-			long start = System.currentTimeMillis();
-
-			for (int x = 0; x < 20; x++) {
-				Thread.sleep(100);
-				int triggeredCheckpoints = numCalls.get();
-				long curT = System.currentTimeMillis();
-
-				/**
-				 * Within a given time-frame T only T/500 checkpoints may be triggered due to the configured minimum
-				 * interval between checkpoints. This value however does not not take the first triggered checkpoint
-				 * into account (=> +1). Furthermore we have to account for the mis-alignment between checkpoints
-				 * being triggered and our time measurement (=> +1); for T=1200 a total of 3-4 checkpoints may have been
-				 * triggered depending on whether the end of the minimum interval for the first checkpoints ends before
-				 * or after T=200.
-				 */
-				long maxAllowedCheckpoints = (curT - start) / 500 + 2;
-				assertTrue(maxAllowedCheckpoints >= triggeredCheckpoints);
-			}
+			AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(
+					jid, attemptID, new CheckpointMetaData(1L, System.currentTimeMillis()));
 
-			coord.stopCheckpointScheduler();
+			// tell the coordinator that the checkpoint is done
+			final long ackTime = System.nanoTime();
+			coord.receiveAcknowledgeMessage(ackMsg);
+
+			// wait until the next checkpoint is triggered
+			Long nextCallId = triggerCalls.take();
+			final long nextCheckpointTime = System.nanoTime();
+			assertEquals(2L, nextCallId.longValue());
+
+			final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
 
+			// we need to add one ms here to account for rounding errors
+			if (delayMillis + 1 < delay) {
+				fail("checkpoint came too early: delay was " + delayMillis + " but should have been at least " + delay);
+			}
+		}
+		finally {
+			coord.stopCheckpointScheduler();
 			coord.shutdown(JobStatus.FINISHED);
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}		
 	}
 
 	@Test