You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/04 08:01:12 UTC
[flink] 01/02: [FLINK-18748][Runtime/Checkpointing] trigger
checkpoint immediately if it's unperiodic
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9071d930285a895f8a415bdae81d0cb6fb34cf3
Author: DG-Wangtao <ta...@deepglint.com>
AuthorDate: Sun Aug 2 17:33:38 2020 +0800
[FLINK-18748][Runtime/Checkpointing] trigger checkpoint immediately if it's unperiodic
---
.../flink/runtime/checkpoint/CheckpointRequestDecider.java | 13 +++++++------
.../runtime/checkpoint/CheckpointRequestDeciderTest.java | 4 ++--
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 0850c28..d054561 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -136,6 +136,11 @@ class CheckpointRequestDecider {
.map(unused -> queuedRequests.pollFirst());
}
+ CheckpointTriggerRequest first = queuedRequests.first();
+ if (first.isForce() || !first.isPeriodic) {
+ return Optional.of(queuedRequests.pollFirst());
+ }
+
long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
if (nextTriggerDelayMillis > 0) {
return onTooEarly(nextTriggerDelayMillis);
@@ -146,15 +151,11 @@ class CheckpointRequestDecider {
private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) {
CheckpointTriggerRequest first = queuedRequests.first();
- if (first.isForce()) {
- return Optional.of(queuedRequests.pollFirst());
- } else if (first.isPeriodic) {
+ if (first.isPeriodic) {
queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
rescheduleTrigger.accept(nextTriggerDelayMillis);
- return Optional.empty();
- } else {
- return Optional.empty();
}
+ return Optional.empty();
}
private long nextTriggerDelayMillis(long lastCheckpointCompletionRelativeTime) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 7facf11..f2a3536 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -119,13 +119,13 @@ public class CheckpointRequestDeciderTest {
public void testSavepointTiming() {
testTiming(regularSavepoint(), TriggerExpectation.IMMEDIATELY);
testTiming(periodicSavepoint(), TriggerExpectation.IMMEDIATELY);
- testTiming(nonForcedSavepoint(), TriggerExpectation.AFTER_PAUSE);
+ testTiming(nonForcedSavepoint(), TriggerExpectation.IMMEDIATELY);
}
@Test
public void testCheckpointTiming() {
testTiming(regularCheckpoint(), TriggerExpectation.DROPPED);
- testTiming(manualCheckpoint(), TriggerExpectation.AFTER_PAUSE);
+ testTiming(manualCheckpoint(), TriggerExpectation.IMMEDIATELY);
}
private enum TriggerExpectation {IMMEDIATELY, AFTER_PAUSE, DROPPED}