You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/19 10:04:31 UTC

[1/2] flink git commit: [FLINK-4322] [checkpointing] Ignore minimum delay for savepoints

Repository: flink
Updated Branches:
  refs/heads/master 0d53daa2f -> 5d7f88031


[FLINK-4322] [checkpointing] Ignore minimum delay for savepoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8854d75c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8854d75c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8854d75c

Branch: refs/heads/master
Commit: 8854d75c13a6e7a593baf6d0d5ae4f47c90718e3
Parents: 0d53daa
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Aug 18 17:27:41 2016 +0530
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Aug 19 12:03:44 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 20 +++++++++++---------
 .../checkpoint/CheckpointCoordinatorTest.java   |  2 +-
 2 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2a1ece0..ff54bad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -333,17 +333,19 @@ public class CheckpointCoordinator {
 					}
 					return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
 				}
-			}
 
-			// make sure the minimum interval between checkpoints has passed
-			if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
-				if (currentPeriodicTrigger != null) {
-					currentPeriodicTrigger.cancel();
-					currentPeriodicTrigger = null;
+				// make sure the minimum interval between checkpoints has passed
+				if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+					if (currentPeriodicTrigger != null) {
+						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger = null;
+					}
+					ScheduledTrigger trigger = new ScheduledTrigger();
+					// Reassign the new trigger to the currentPeriodicTrigger
+					currentPeriodicTrigger = trigger;
+					timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
+					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
-				ScheduledTrigger trigger = new ScheduledTrigger();
-				timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
-				return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3341095..f243803 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1408,7 +1408,7 @@ public class CheckpointCoordinatorTest {
 		assertTrue(coord.triggerCheckpoint(timestamp + 3));
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-		Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp);
+		Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp + 4);
 		long savepointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 


[2/2] flink git commit: [FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest

Posted by uc...@apache.org.
[FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest

The added tests check that savepoints ignore the maximum number
of concurrent checkpoints and minimum delay between checkpoints.

This closes #2385.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d7f8803
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d7f8803
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d7f8803

Branch: refs/heads/master
Commit: 5d7f8803155d2eb8865cce9a60dd677c2400261c
Parents: 8854d75
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Aug 19 12:01:24 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Aug 19 12:04:12 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinatorTest.java   | 90 +++++++++++++++++++-
 1 file changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d7f8803/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f243803..09c53d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.mockito.stubbing.Answer;
 import scala.concurrent.Future;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1646,7 +1647,94 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Tests that the savepoints can be triggered concurrently.
+	 */
+	@Test
+	public void testConcurrentSavepoints() throws Exception {
+		JobID jobId = new JobID();
+
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jobId,
+				100000,
+				200000,
+				0L,
+				1, // max one checkpoint at a time => should not affect savepoints
+				42,
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				cl,
+				checkpointIDCounter,
+				new StandaloneCompletedCheckpointStore(2, cl),
+				new HeapSavepointStore(),
+				new DisabledCheckpointStatsTracker());
+
+		List<Future<String>> savepointFutures = new ArrayList<>();
+
+		int numSavepoints = 5;
+
+		// Trigger savepoints
+		for (int i = 0; i < numSavepoints; i++) {
+			savepointFutures.add(coord.triggerSavepoint(i));
+		}
+
+		// After triggering multiple savepoints, all should in progress
+		for (Future<String> savepointFuture : savepointFutures) {
+			assertFalse(savepointFuture.isCompleted());
+		}
+
+		// ACK all savepoints
+		long checkpointId = checkpointIDCounter.getLast();
+		for (int i = 0; i < numSavepoints; i++, checkpointId--) {
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+		}
+
+		// After ACKs, all should be completed
+		for (Future<String> savepointFuture : savepointFutures) {
+			assertTrue(savepointFuture.isCompleted());
+		}
+	}
+
+	/**
+	 * Tests that no minimum delay between savepoints is enforced.
+	 */
+	@Test
+	public void testMinDelayBetweenSavepoints() throws Exception {
+		JobID jobId = new JobID();
+
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jobId,
+				100000,
+				200000,
+				100000000L, // very long min delay => should not affect savepoints
+				1,
+				42,
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				cl,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2, cl),
+				new HeapSavepointStore(),
+				new DisabledCheckpointStatsTracker());
+
+		Future<String> savepoint0 = coord.triggerSavepoint(0);
+		assertFalse("Did not trigger savepoint", savepoint0.isCompleted());
+
+		Future<String> savepoint1 = coord.triggerSavepoint(1);
+		assertFalse("Did not trigger savepoint", savepoint1.isCompleted());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------