You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/11/28 16:23:24 UTC

[flink] branch release-1.9 updated: [FLINK-10377] Support checkpoint overtaking a savepoint in TwoPhaseCommitSink

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new cf82a14  [FLINK-10377] Support checkpoint overtaking a savepoint in TwoPhaseCommitSink
cf82a14 is described below

commit cf82a145921d08fd8ac6e76d57b50060056d0f47
Author: Stefan Richter <st...@gmail.com>
AuthorDate: Thu Nov 28 17:21:52 2019 +0100

    [FLINK-10377] Support checkpoint overtaking a savepoint in TwoPhaseCommitSink
    
    The precondition checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); in TwoPhaseCommitSinkFunction.notifyCheckpointComplete() seems too strict, because checkpoints can overtake checkpoints and will fail the precondition. In this case the commit was already performed by the first notification and subsumes the late checkpoint. I think the check can be removed.
    
    This can happen in the following scenario:
    # savepoint is triggered
    # checkpoint is triggered
    # checkpoint completes (but it doesn't subsume the savepoint, because checkpoints subsume only other checkpoints).
    # savepoint completes
---
 .../functions/sink/TwoPhaseCommitSinkFunction.java   |  1 -
 .../sink/TwoPhaseCommitSinkFunctionTest.java         | 20 ++++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 588592e..7516f8d1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -264,7 +264,6 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 		//
 
 		Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
-		checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
 		Throwable firstError = null;
 
 		while (pendingTransactionIterator.hasNext()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 84c0104..91b1fcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -139,6 +139,26 @@ public class TwoPhaseCommitSinkFunctionTest {
 		harness.close();
 	}
 
+	/**
+	 * This can happen if savepoint and checkpoint are triggered one after another and checkpoints completes first.
+	 * See FLINK-10377 and FLINK-14979 for more details.
+	 **/
+	@Test
+	public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception {
+		harness.open();
+		harness.processElement("42", 0);
+		harness.snapshot(0, 1);
+		harness.processElement("43", 2);
+		harness.snapshot(1, 3);
+		harness.processElement("44", 4);
+		harness.snapshot(2, 5);
+		harness.notifyOfCompletedCheckpoint(2);
+		harness.notifyOfCompletedCheckpoint(1);
+
+		assertExactlyOnce(Arrays.asList("42", "43", "44"));
+		assertEquals(1, tmpDirectory.listFiles().size()); // one for currentTransaction
+	}
+
 	@Test
 	public void testNotifyOfCompletedCheckpoint() throws Exception {
 		harness.open();