You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/19 10:04:31 UTC
[1/2] flink git commit: [FLINK-4322] [checkpointing] Ignore minimum
delay for savepoints
Repository: flink
Updated Branches:
refs/heads/master 0d53daa2f -> 5d7f88031
[FLINK-4322] [checkpointing] Ignore minimum delay for savepoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8854d75c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8854d75c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8854d75c
Branch: refs/heads/master
Commit: 8854d75c13a6e7a593baf6d0d5ae4f47c90718e3
Parents: 0d53daa
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Aug 18 17:27:41 2016 +0530
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Aug 19 12:03:44 2016 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 20 +++++++++++---------
.../checkpoint/CheckpointCoordinatorTest.java | 2 +-
2 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2a1ece0..ff54bad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -333,17 +333,19 @@ public class CheckpointCoordinator {
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
- }
- // make sure the minimum interval between checkpoints has passed
- if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
- if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
- currentPeriodicTrigger = null;
+ // make sure the minimum interval between checkpoints has passed
+ if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger = null;
+ }
+ ScheduledTrigger trigger = new ScheduledTrigger();
+ // Reassign the new trigger to the currentPeriodicTrigger
+ currentPeriodicTrigger = trigger;
+ timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
+ return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
- ScheduledTrigger trigger = new ScheduledTrigger();
- timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
- return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3341095..f243803 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1408,7 +1408,7 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.triggerCheckpoint(timestamp + 3));
assertEquals(2, coord.getNumberOfPendingCheckpoints());
- Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp);
+ Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp + 4);
long savepointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints());
[2/2] flink git commit: [FLINK-4322] [checkpointing] Extend
CheckpointCoordinatorTest
Posted by uc...@apache.org.
[FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest
The added tests check that savepoints ignore the maximum number
of concurrent checkpoints and minimum delay between checkpoints.
This closes #2385.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d7f8803
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d7f8803
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d7f8803
Branch: refs/heads/master
Commit: 5d7f8803155d2eb8865cce9a60dd677c2400261c
Parents: 8854d75
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Aug 19 12:01:24 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Aug 19 12:04:12 2016 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinatorTest.java | 90 +++++++++++++++++++-
1 file changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5d7f8803/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f243803..09c53d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.mockito.stubbing.Answer;
import scala.concurrent.Future;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1646,7 +1647,94 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-
+
+ /**
+ * Tests that the savepoints can be triggered concurrently.
+ */
+ @Test
+ public void testConcurrentSavepoints() throws Exception {
+ JobID jobId = new JobID();
+
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+ StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jobId,
+ 100000,
+ 200000,
+ 0L,
+ 1, // max one checkpoint at a time => should not affect savepoints
+ 42,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ cl,
+ checkpointIDCounter,
+ new StandaloneCompletedCheckpointStore(2, cl),
+ new HeapSavepointStore(),
+ new DisabledCheckpointStatsTracker());
+
+ List<Future<String>> savepointFutures = new ArrayList<>();
+
+ int numSavepoints = 5;
+
+ // Trigger savepoints
+ for (int i = 0; i < numSavepoints; i++) {
+ savepointFutures.add(coord.triggerSavepoint(i));
+ }
+
+ // After triggering multiple savepoints, all should in progress
+ for (Future<String> savepointFuture : savepointFutures) {
+ assertFalse(savepointFuture.isCompleted());
+ }
+
+ // ACK all savepoints
+ long checkpointId = checkpointIDCounter.getLast();
+ for (int i = 0; i < numSavepoints; i++, checkpointId--) {
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+ }
+
+ // After ACKs, all should be completed
+ for (Future<String> savepointFuture : savepointFutures) {
+ assertTrue(savepointFuture.isCompleted());
+ }
+ }
+
+ /**
+ * Tests that no minimum delay between savepoints is enforced.
+ */
+ @Test
+ public void testMinDelayBetweenSavepoints() throws Exception {
+ JobID jobId = new JobID();
+
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jobId,
+ 100000,
+ 200000,
+ 100000000L, // very long min delay => should not affect savepoints
+ 1,
+ 42,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl),
+ new HeapSavepointStore(),
+ new DisabledCheckpointStatsTracker());
+
+ Future<String> savepoint0 = coord.triggerSavepoint(0);
+ assertFalse("Did not trigger savepoint", savepoint0.isCompleted());
+
+ Future<String> savepoint1 = coord.triggerSavepoint(1);
+ assertFalse("Did not trigger savepoint", savepoint1.isCompleted());
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------