You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/04 08:01:12 UTC

[flink] 01/02: [FLINK-18748][Runtime/Checkpointing] trigger checkpoint immediately if it's unperiodic

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9071d930285a895f8a415bdae81d0cb6fb34cf3
Author: DG-Wangtao <ta...@deepglint.com>
AuthorDate: Sun Aug 2 17:33:38 2020 +0800

    [FLINK-18748][Runtime/Checkpointing] trigger checkpoint immediately if it's unperiodic
---
 .../flink/runtime/checkpoint/CheckpointRequestDecider.java  | 13 +++++++------
 .../runtime/checkpoint/CheckpointRequestDeciderTest.java    |  4 ++--
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 0850c28..d054561 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -136,6 +136,11 @@ class CheckpointRequestDecider {
 				.map(unused -> queuedRequests.pollFirst());
 		}
 
+		CheckpointTriggerRequest first = queuedRequests.first();
+		if (first.isForce() || !first.isPeriodic) {
+			return Optional.of(queuedRequests.pollFirst());
+		}
+
 		long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
 		if (nextTriggerDelayMillis > 0) {
 			return onTooEarly(nextTriggerDelayMillis);
@@ -146,15 +151,11 @@ class CheckpointRequestDecider {
 
 	private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) {
 		CheckpointTriggerRequest first = queuedRequests.first();
-		if (first.isForce()) {
-			return Optional.of(queuedRequests.pollFirst());
-		} else if (first.isPeriodic) {
+		if (first.isPeriodic) {
 			queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
 			rescheduleTrigger.accept(nextTriggerDelayMillis);
-			return Optional.empty();
-		} else {
-			return Optional.empty();
 		}
+		return Optional.empty();
 	}
 
 	private long nextTriggerDelayMillis(long lastCheckpointCompletionRelativeTime) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 7facf11..f2a3536 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -119,13 +119,13 @@ public class CheckpointRequestDeciderTest {
 	public void testSavepointTiming() {
 		testTiming(regularSavepoint(), TriggerExpectation.IMMEDIATELY);
 		testTiming(periodicSavepoint(), TriggerExpectation.IMMEDIATELY);
-		testTiming(nonForcedSavepoint(), TriggerExpectation.AFTER_PAUSE);
+		testTiming(nonForcedSavepoint(), TriggerExpectation.IMMEDIATELY);
 	}
 
 	@Test
 	public void testCheckpointTiming() {
 		testTiming(regularCheckpoint(), TriggerExpectation.DROPPED);
-		testTiming(manualCheckpoint(), TriggerExpectation.AFTER_PAUSE);
+		testTiming(manualCheckpoint(), TriggerExpectation.IMMEDIATELY);
 	}
 
 	private enum TriggerExpectation {IMMEDIATELY, AFTER_PAUSE, DROPPED}