You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/11 18:37:01 UTC

flink git commit: [FLINK-3492] Configurable interval between Checkpoints

Repository: flink
Updated Branches:
  refs/heads/master ef7f9ac9a -> 2e63d1afb


[FLINK-3492] Configurable interval between Checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e63d1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e63d1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e63d1af

Branch: refs/heads/master
Commit: 2e63d1afb2358760109ba0c90011ef565eaae0ff
Parents: ef7f9ac
Author: zentol <ch...@apache.org>
Authored: Thu Mar 17 12:27:29 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 11 18:36:21 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 14 +++
 .../checkpoint/CheckpointCoordinatorTest.java   | 89 +++++++++++++++++++-
 .../api/environment/CheckpointConfig.java       | 34 ++++----
 3 files changed, 117 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d1af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index edeab6a..8a856bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -127,6 +127,8 @@ public class CheckpointCoordinator {
 
 	private ScheduledTrigger currentPeriodicTrigger;
 
+	private long lastTriggeredCheckpoint;
+
 	/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
 	 * Non-volatile, because only accessed in synchronized scope */
 	private boolean periodicScheduling;
@@ -370,6 +372,17 @@ public class CheckpointCoordinator {
 				}
 				return false;
 			}
+
+			//make sure the minimum interval between checkpoints has passed
+			if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+				if (currentPeriodicTrigger != null) {
+					currentPeriodicTrigger.cancel();
+					currentPeriodicTrigger = null;
+				}
+				ScheduledTrigger trigger = new ScheduledTrigger();
+				timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
+				return false;
+			}
 		}
 
 		// first check if all tasks that we need to trigger are running.
@@ -403,6 +416,7 @@ public class CheckpointCoordinator {
 
 		// we will actually trigger this checkpoint!
 
+		lastTriggeredCheckpoint = timestamp;
 		final long checkpointID;
 		if (nextCheckpointId < 0) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d1af/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d385d73..266cf52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -30,9 +30,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -43,7 +41,12 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
@@ -1027,6 +1030,86 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	/**
+	 * This test verified that after a completed checkpoint a certain time has passed before
+	 * another is triggered.
+	 */
+	@Test
+	public void testMinInterval() {
+		try {
+			final JobID jid = new JobID();
+
+			// create some mock execution vertices and trigger some checkpoint
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+			final AtomicInteger numCalls = new AtomicInteger();
+
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					if (invocation.getArguments()[0] instanceof TriggerCheckpoint) {
+						numCalls.incrementAndGet();
+					}
+					return null;
+				}
+			}).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				10,		// periodic interval is 10 ms
+				200000,	// timeout is very long (200 s)
+				500,	// 500ms delay between checkpoints
+				10,
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 }, cl, new StandaloneCheckpointIDCounter
+				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
+				new DisabledCheckpointStatsTracker());
+
+			coord.startCheckpointScheduler();
+
+			//wait until the first checkpoint was triggered
+			for (int x=0; x<20; x++) {
+				Thread.sleep(100);
+				if (numCalls.get() > 0) {
+					break;
+				}
+			}
+
+			if (numCalls.get() == 0) {
+				fail("No checkpoint was triggered within the first 2000 ms.");
+			}
+			
+			long start = System.currentTimeMillis();
+
+			for (int x = 0; x < 20; x++) {
+				Thread.sleep(100);
+				int triggeredCheckpoints = numCalls.get();
+				long curT = System.currentTimeMillis();
+
+				/**
+				 * Within a given time-frame T only T/500 checkpoints may be triggered due to the configured minimum
+				 * interval between checkpoints. This value however does not not take the first triggered checkpoint
+				 * into account (=> +1). Furthermore we have to account for the mis-alignment between checkpoints
+				 * being triggered and our time measurement (=> +1); for T=1200 a total of 3-4 checkpoints may have been
+				 * triggered depending on whether the end of the minimum interval for the first checkpoints ends before
+				 * or after T=200.
+				 */
+				long maxAllowedCheckpoints = (curT - start) / 500 + 2;
+				assertTrue(maxAllowedCheckpoints >= triggeredCheckpoints);
+			}
+
+			coord.stopCheckpointScheduler();
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}		
+	}
+
 	@Test
 	public void testMaxConcurrentAttempts1() {
 		testMaxConcurrentAttemps(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d1af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 327b524..cdeef94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -153,23 +153,23 @@ public class CheckpointConfig implements java.io.Serializable {
 		return minPauseBetweenCheckpoints;
 	}
 
-//	/**
-//	 * Sets the minimal pause between checkpointing attempts. This setting defines how soon the
-//	 * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
-//	 * another checkpoint with respect to the maximum number of concurrent checkpoints
-//	 * (see {@link #setMaxConcurrentCheckpoints(int)}).
-//	 * 
-//	 * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure
-//	 * that a minimum amount of time passes where no checkpoint is in progress at all.
-//	 * 
-//	 * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered.
-//	 */
-//	public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) {
-//		if (minPauseBetweenCheckpoints < 0) {
-//			throw new IllegalArgumentException("Pause value must be zero or positive");
-//		}
-//		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
-//	}
+	/**
+	 * Sets the minimal pause between checkpointing attempts. This setting defines how soon the
+	 * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
+	 * another checkpoint with respect to the maximum number of concurrent checkpoints
+	 * (see {@link #setMaxConcurrentCheckpoints(int)}).
+	 * 
+	 * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure
+	 * that a minimum amount of time passes where no checkpoint is in progress at all.
+	 * 
+	 * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered.
+	 */
+	public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) {
+		if (minPauseBetweenCheckpoints < 0) {
+			throw new IllegalArgumentException("Pause value must be zero or positive");
+		}
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+	}
 
 	/**
 	 * Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this