You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/12/21 14:29:20 UTC

[flink] 10/18: [FLINK-19681][checkpointing] Reset channel barrier tracking from AlignedController

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f40b4eb7703ce2a99fe8f8ef775a63197d45694c
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Nov 9 19:21:30 2020 +0100

    [FLINK-19681][checkpointing] Reset channel barrier tracking from AlignedController
    
    Input channels are unaware of controller types and always update their
    pendingCheckpointBarrierId.  Therefore, resetting it also should be done
    in either case.  Otherwise, pendingCheckpointBarrierId may be left in a
    wrong state upon receiving a new barrier.
---
 .../flink/streaming/runtime/io/AlignedController.java    |  8 ++++++++
 .../runtime/io/AlignedControllerMassiveRandomTest.java   |  6 +++++-
 .../streaming/runtime/io/AlignedControllerTest.java      |  6 +++++-
 .../streaming/runtime/io/AlternatingControllerTest.java  | 16 ++++++++++++++++
 4 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index 0991dc9..be6f2ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -102,6 +102,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException {
 		checkState(!barrier.getCheckpointOptions().isUnalignedCheckpoint());
+		resetPendingCheckpoint(barrier.getId());
 		resumeConsumption();
 		return Optional.of(barrier);
 	}
@@ -110,6 +111,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 	public void abortPendingCheckpoint(
 			long cancelledId,
 			CheckpointException exception) throws IOException {
+		resetPendingCheckpoint(cancelledId);
 		resumeConsumption();
 	}
 
@@ -120,6 +122,12 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 		resumeConsumption(channelInfo);
 	}
 
+	protected void resetPendingCheckpoint(long cancelledId) {
+		for (final CheckpointableInput input : inputs) {
+			input.checkpointStopped(cancelledId);
+		}
+	}
+
 	public Collection<InputChannelInfo> getBlockedChannels() {
 		return blockedChannels.entrySet()
 			.stream()
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
index 0dae4a3..da9bdec 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
@@ -71,7 +71,11 @@ public class AlignedControllerMassiveRandomTest {
 						"Testing: No task associated",
 						new DummyCheckpointInvokable(),
 						myIG.getNumberOfInputChannels(),
-						new AlignedController(myIG)),
+						new AlignedController(myIG) {
+							@Override
+							protected void resetPendingCheckpoint(long cancelledId) {
+							}
+					}),
 					new SyncMailboxExecutor());
 
 			for (int i = 0; i < 2000000; i++) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
index b9eac08..4538f96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
@@ -133,7 +133,11 @@ public class AlignedControllerTest {
 				"Testing",
 				toNotify,
 				gate.getNumberOfInputChannels(),
-				new AlignedController(gate)),
+				new AlignedController(gate) {
+					@Override
+					protected void resetPendingCheckpoint(long cancelledId) {
+					}
+				}),
 			new SyncMailboxExecutor());
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 8d81df2..b12a0ad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -68,6 +68,22 @@ import static org.junit.Assert.assertFalse;
  */
 public class AlternatingControllerTest {
 
+	/**
+	 * Upon subsuming (or canceling) a checkpoint, channels should be notified regardless of whether UC controller is
+	 * currently being used or not. Otherwise, channels may not capture in-flight buffers.
+	 */
+	@Test
+	public void testChannelResetOnNewBarrier() throws Exception {
+		RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
+		CheckpointedInputGate gate = buildRemoteInputGate(new ValidatingCheckpointHandler(), 2, stateWriter);
+
+		sendBarrier(0, System.currentTimeMillis(), SAVEPOINT, gate, 0); // using AC because UC would require ordering in gate while polling
+		((RemoteInputChannel) gate.getChannel(0)).onBuffer(createBuffer(1024), 1, 0); // to be captured
+		send(toBuffer(new CheckpointBarrier(1, System.currentTimeMillis(), unaligned(getDefault())), true), 1, gate);
+
+		assertFalse(stateWriter.getAddedInput().isEmpty());
+	}
+
 	@Test
 	public void testCheckpointHandling() throws Exception {
 		testBarrierHandling(CHECKPOINT);