You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/11/28 16:23:24 UTC
[flink] branch release-1.9 updated: [FLINK-10377] Support
checkpoint overtaking a savepoint in TwoPhaseCommitSink
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new cf82a14 [FLINK-10377] Support checkpoint overtaking a savepoint in TwoPhaseCommitSink
cf82a14 is described below
commit cf82a145921d08fd8ac6e76d57b50060056d0f47
Author: Stefan Richter <st...@gmail.com>
AuthorDate: Thu Nov 28 17:21:52 2019 +0100
[FLINK-10377] Support checkpoint overtaking a savepoint in TwoPhaseCommitSink
The precondition checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); in TwoPhaseCommitSinkFunction.notifyCheckpointComplete() seems too strict, because checkpoints can overtake checkpoints and will fail the precondition. In this case the commit was already performed by the first notification and subsumes the late checkpoint. I think the check can be removed.
This can happen in the following scenario:
# savepoint is triggered
# checkpoint is triggered
# checkpoint completes (but it doesn't subsume the savepoint, because checkpoints subsume only other checkpoints).
# savepoint completes
---
.../functions/sink/TwoPhaseCommitSinkFunction.java | 1 -
.../sink/TwoPhaseCommitSinkFunctionTest.java | 20 ++++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 588592e..7516f8d1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -264,7 +264,6 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
//
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
- checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 84c0104..91b1fcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -139,6 +139,26 @@ public class TwoPhaseCommitSinkFunctionTest {
harness.close();
}
+ /**
+ * This can happen if savepoint and checkpoint are triggered one after another and checkpoints completes first.
+ * See FLINK-10377 and FLINK-14979 for more details.
+ **/
+ @Test
+ public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception {
+ harness.open();
+ harness.processElement("42", 0);
+ harness.snapshot(0, 1);
+ harness.processElement("43", 2);
+ harness.snapshot(1, 3);
+ harness.processElement("44", 4);
+ harness.snapshot(2, 5);
+ harness.notifyOfCompletedCheckpoint(2);
+ harness.notifyOfCompletedCheckpoint(1);
+
+ assertExactlyOnce(Arrays.asList("42", "43", "44"));
+ assertEquals(1, tmpDirectory.listFiles().size()); // one for currentTransaction
+ }
+
@Test
public void testNotifyOfCompletedCheckpoint() throws Exception {
harness.open();