You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/12/21 14:29:10 UTC

[flink] branch master updated (3c0442e -> cd6967c)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3c0442e  [hotfix] Fix incorrect output type in StreamExecTableSourceScan & BatchExecTableSourceScan
     new f4ba4df  [hotfix][test] Improve error message in ValidatingCheckpointHandler
     new 37e0fb4  [hotfix][test] Fix StreamConfig propagation to StreamTask in StreamTaskMailboxTestHarnessBuilder
     new 5d446d2  [FLINK-19681][checkpointing] Choose controler before processing first barrier or announcement
     new c5c46ab  [FLINK-19681][checkpointing] Timeout aligned checkpoints based on checkpointStartDelay
     new c0b9089  [FLINK-19681][tests] Adjust alignmentTimeout in unaligned checkpoint ITCases
     new 28a8014  [FLINK-19681][config][checkpointing] Un-hide alignment timeout option
     new 6710045  [hotfix][network] Report channel index if failied to deserialize
     new f11452e  [hotfix][checkpointing] Add preconditions to channels and controllers
     new b49ad67  [FLINK-19681][checkpointing] Fix barrier tracking in input channels
     new f40b4eb  [FLINK-19681][checkpointing] Reset channel barrier tracking from AlignedController
     new 033b4f3  [FLINK-19681][checkpointing] Resume consumption when receiving different upstream signals
     new 002a584  [FLINK-19681][checkpointing] Use converted barrier after disabling alignment
     new cf3b861  [FLINK-19681][checkpointing] Address minor feedback
     new b16a62a  [FLINK-19681][checkpointing] Use time of start of alignment instead of checkpoint to timeout
     new 9fdbbfc  [FLINK-19681][checkpointing] Switch controller before processing the first barrier
     new d5043c9  [FLINK-19681][checkpointing] Don't timeout checkpoint on last barrier
     new 26ca3f6  [hotfix][checkpointing] Explicit creation of CheckpointOptions
     new cd6967c  [FLINK-19681][network] Force priority for converted barriers

The 18 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../execution_checkpointing_configuration.html     |   6 +
 .../runtime/checkpoint/CheckpointCoordinator.java  |   2 +-
 .../runtime/checkpoint/CheckpointOptions.java      |  80 +++-
 .../runtime/io/network/api/CheckpointBarrier.java  |   4 +
 .../partition/BufferWritingResultPartition.java    |   3 +
 .../io/network/partition/PrioritizedDeque.java     |  22 +
 .../partition/consumer/ChannelStatePersister.java  |  35 +-
 .../partition/consumer/CheckpointableInput.java    |   2 +
 .../partition/consumer/IndexedInputGate.java       |   7 +
 .../network/partition/consumer/InputChannel.java   |   3 +
 .../partition/consumer/LocalInputChannel.java      |   7 +-
 .../partition/consumer/RemoteInputChannel.java     |  78 +++-
 .../partition/consumer/SingleInputGate.java        |  16 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   5 +-
 .../runtime/checkpoint/CheckpointOptionsTest.java  |  53 +--
 .../api/serialization/EventSerializerTest.java     |   8 +-
 .../PipelinedSubpartitionWithReadViewTest.java     |  15 +-
 .../io/network/partition/PrioritizedDequeTest.java |  27 ++
 .../partition/consumer/LocalInputChannelTest.java  |  41 +-
 .../partition/consumer/RemoteInputChannelTest.java |  91 ++++-
 .../environment/ExecutionCheckpointingOptions.java |   1 -
 .../streaming/runtime/io/AlignedController.java    |  70 +++-
 .../runtime/io/AlternatingController.java          | 113 +++++-
 .../io/CheckpointBarrierBehaviourController.java   |  17 +-
 .../runtime/io/SingleCheckpointBarrierHandler.java | 116 ++++--
 .../runtime/io/StreamTaskNetworkInput.java         |   7 +-
 .../runtime/io/StreamTaskSourceInput.java          |   5 +
 .../streaming/runtime/io/UnalignedController.java  |  30 +-
 .../runtime/tasks/SourceOperatorStreamTask.java    |   6 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |   6 +-
 .../runtime/tasks/mailbox/MailboxProcessor.java    |   5 +
 .../io/AlignedControllerMassiveRandomTest.java     |   6 +-
 .../runtime/io/AlignedControllerTest.java          |  35 +-
 .../runtime/io/AlternatingControllerTest.java      | 445 +++++++++++++++++----
 .../runtime/io/InputProcessorUtilTest.java         |   3 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |   2 +-
 .../io/UnalignedControllerCancellationTest.java    |   2 +-
 .../runtime/io/UnalignedControllerTest.java        |   5 +-
 .../runtime/io/ValidatingCheckpointHandler.java    |  14 +-
 ...tStreamTaskChainedSourcesCheckpointingTest.java |   2 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |   5 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   5 +-
 .../UnalignedCheckpointCompatibilityITCase.java    |   1 +
 .../checkpointing/UnalignedCheckpointTestBase.java |   1 +
 44 files changed, 1121 insertions(+), 286 deletions(-)


[flink] 11/18: [FLINK-19681][checkpointing] Resume consumption when receiving different upstream signals

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 033b4f3b9c702d9841e481428acf4bc7ff7a194f
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Dec 2 23:57:19 2020 +0100

    [FLINK-19681][checkpointing] Resume consumption when receiving different upstream signals
    
    Solves hanging up in 1/12 uc tests.
---
 .../streaming/runtime/io/AlternatingController.java |  3 ++-
 .../runtime/io/AlternatingControllerTest.java       | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index f943638..90b79c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -92,9 +92,10 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 				return maybeTimedOut;
 			}
 			else {
-				// TODO: add unit test for this
 				alignedController.resumeConsumption(channelInfo);
 			}
+		} else if (!barrier.getCheckpointOptions().isUnalignedCheckpoint() && activeController == unalignedController) {
+			alignedController.resumeConsumption(channelInfo);
 		}
 		return Optional.empty();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index b12a0ad..7ecf8cb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -68,6 +68,27 @@ import static org.junit.Assert.assertFalse;
  */
 public class AlternatingControllerTest {
 
+	@Test
+	public void testChannelUnblockedAfterDifferentBarriers() throws Exception {
+		CheckpointedInputGate gate = buildGate(new ValidatingCheckpointHandler(), 3);
+		long barrierId = 1L;
+		long ts = System.currentTimeMillis();
+		long timeout = 10;
+
+		send(barrier(barrierId, ts, unaligned(getDefault())), 0, gate);
+
+		TestInputChannel acChannel = (TestInputChannel) gate.getChannel(1);
+		acChannel.setBlocked(true);
+		send(barrier(barrierId, ts, alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), acChannel.getChannelIndex(), gate);
+		assertFalse(acChannel.isBlocked());
+
+		Thread.sleep(timeout);
+		TestInputChannel acChannelWithTimeout = (TestInputChannel) gate.getChannel(2);
+		acChannelWithTimeout.setBlocked(true);
+		send(barrier(barrierId, ts, alignedWithTimeout(getDefault(), timeout)), acChannelWithTimeout.getChannelIndex(), gate);
+		assertFalse(acChannelWithTimeout.isBlocked());
+	}
+
 	/**
 	 * Upon subsuming (or canceling) a checkpoint, channels should be notified regardless of whether UC controller is
 	 * currently being used or not. Otherwise, channels may not capture in-flight buffers.


[flink] 14/18: [FLINK-19681][checkpointing] Use time of start of alignment instead of checkpoint to timeout

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b16a62a6a6a397817891a485ad87ca9772d8be52
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Dec 2 14:36:12 2020 +0100

    [FLINK-19681][checkpointing] Use time of start of alignment instead of checkpoint to timeout
---
 .../streaming/runtime/io/AlternatingController.java     | 17 ++++++++++++++++-
 .../streaming/runtime/io/AlternatingControllerTest.java |  1 +
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 080a7ce..7d8761a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -39,6 +39,8 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	private final UnalignedController unalignedController;
 
 	private CheckpointBarrierBehaviourController activeController;
+	private long firstBarrierArrivalTime = Long.MAX_VALUE;
+	private long lastSeenBarrier = -1L;
 
 	public AlternatingController(
 			AlignedController alignedController,
@@ -58,6 +60,10 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 			InputChannelInfo channelInfo,
 			CheckpointBarrier announcedBarrier,
 			int sequenceNumber) throws IOException {
+		if (lastSeenBarrier < announcedBarrier.getId()) {
+			lastSeenBarrier = announcedBarrier.getId();
+			firstBarrierArrivalTime = getArrivalTime(announcedBarrier);
+		}
 
 		Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(announcedBarrier);
 		announcedBarrier = maybeTimedOut.orElse(announcedBarrier);
@@ -103,6 +109,10 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	public Optional<CheckpointBarrier> preProcessFirstBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException, CheckpointException {
+		if (lastSeenBarrier < barrier.getId()) {
+			lastSeenBarrier = barrier.getId();
+			firstBarrierArrivalTime = getArrivalTime(barrier);
+		}
 		return activeController.preProcessFirstBarrier(channelInfo, barrier);
 	}
 
@@ -191,6 +201,11 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 
 	private boolean canTimeout(CheckpointBarrier barrier) {
 		return barrier.getCheckpointOptions().isTimeoutable() &&
-			barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp());
+			barrier.getId() <= lastSeenBarrier &&
+			barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+	}
+
+	private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+		return announcedBarrier.getCheckpointOptions().isTimeoutable() ? System.nanoTime() : Long.MAX_VALUE;
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 7ecf8cb..34485f5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -199,6 +199,7 @@ public class AlternatingControllerTest {
 		// First announcements and prioritsed barriers
 		List<AbstractEvent> events = new ArrayList<>();
 		events.add(gate.pollNext().get().getEvent());
+		Thread.sleep(alignmentTimeout * 2);
 		events.add(gate.pollNext().get().getEvent());
 		events.add(gate.pollNext().get().getEvent());
 		events.add(gate.pollNext().get().getEvent());


[flink] 16/18: [FLINK-19681][checkpointing] Don't timeout checkpoint on last barrier

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5043c9335347a512f11173dd815dfe08a244973
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Dec 2 14:58:24 2020 +0100

    [FLINK-19681][checkpointing] Don't timeout checkpoint on last barrier
---
 .../runtime/io/AlternatingController.java          | 23 +---------------------
 1 file changed, 1 insertion(+), 22 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index e3f816c..0a45f97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -145,28 +145,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 
 	@Override
 	public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
-		Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier);
-		if (maybeTimeOut.isPresent() && activeController == alignedController) {
-			switchToUnaligned(channelInfo, maybeTimeOut.get());
-			checkState(activeController == unalignedController);
-			checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent());
-			return maybeTimeOut;
-		}
-
-		barrier = maybeTimeOut.orElse(barrier);
-		if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-			checkState(activeController == unalignedController);
-			checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent());
-			return Optional.empty();
-		}
-		else {
-			checkState(activeController == alignedController);
-			Optional<CheckpointBarrier> triggerResult = activeController.postProcessLastBarrier(
-				channelInfo,
-				barrier);
-			checkState(triggerResult.isPresent());
-			return triggerResult;
-		}
+		return activeController.postProcessLastBarrier(channelInfo, barrier);
 	}
 
 	@Override


[flink] 10/18: [FLINK-19681][checkpointing] Reset channel barrier tracking from AlignedController

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f40b4eb7703ce2a99fe8f8ef775a63197d45694c
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Nov 9 19:21:30 2020 +0100

    [FLINK-19681][checkpointing] Reset channel barrier tracking from AlignedController
    
    Input channels are unaware of controller types and always update their
    pendingCheckpointBarrierId.  Therefore, resetting it also should be done
    in either case.  Otherwise, pendingCheckpointBarrierId may be left in a
    wrong state upon receiving a new barrier.
---
 .../flink/streaming/runtime/io/AlignedController.java    |  8 ++++++++
 .../runtime/io/AlignedControllerMassiveRandomTest.java   |  6 +++++-
 .../streaming/runtime/io/AlignedControllerTest.java      |  6 +++++-
 .../streaming/runtime/io/AlternatingControllerTest.java  | 16 ++++++++++++++++
 4 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index 0991dc9..be6f2ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -102,6 +102,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException {
 		checkState(!barrier.getCheckpointOptions().isUnalignedCheckpoint());
+		resetPendingCheckpoint(barrier.getId());
 		resumeConsumption();
 		return Optional.of(barrier);
 	}
@@ -110,6 +111,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 	public void abortPendingCheckpoint(
 			long cancelledId,
 			CheckpointException exception) throws IOException {
+		resetPendingCheckpoint(cancelledId);
 		resumeConsumption();
 	}
 
@@ -120,6 +122,12 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 		resumeConsumption(channelInfo);
 	}
 
+	protected void resetPendingCheckpoint(long cancelledId) {
+		for (final CheckpointableInput input : inputs) {
+			input.checkpointStopped(cancelledId);
+		}
+	}
+
 	public Collection<InputChannelInfo> getBlockedChannels() {
 		return blockedChannels.entrySet()
 			.stream()
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
index 0dae4a3..da9bdec 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerMassiveRandomTest.java
@@ -71,7 +71,11 @@ public class AlignedControllerMassiveRandomTest {
 						"Testing: No task associated",
 						new DummyCheckpointInvokable(),
 						myIG.getNumberOfInputChannels(),
-						new AlignedController(myIG)),
+						new AlignedController(myIG) {
+							@Override
+							protected void resetPendingCheckpoint(long cancelledId) {
+							}
+					}),
 					new SyncMailboxExecutor());
 
 			for (int i = 0; i < 2000000; i++) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
index b9eac08..4538f96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
@@ -133,7 +133,11 @@ public class AlignedControllerTest {
 				"Testing",
 				toNotify,
 				gate.getNumberOfInputChannels(),
-				new AlignedController(gate)),
+				new AlignedController(gate) {
+					@Override
+					protected void resetPendingCheckpoint(long cancelledId) {
+					}
+				}),
 			new SyncMailboxExecutor());
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 8d81df2..b12a0ad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -68,6 +68,22 @@ import static org.junit.Assert.assertFalse;
  */
 public class AlternatingControllerTest {
 
+	/**
+	 * Upon subsuming (or canceling) a checkpoint, channels should be notified regardless of whether UC controller is
+	 * currently being used or not. Otherwise, channels may not capture in-flight buffers.
+	 */
+	@Test
+	public void testChannelResetOnNewBarrier() throws Exception {
+		RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
+		CheckpointedInputGate gate = buildRemoteInputGate(new ValidatingCheckpointHandler(), 2, stateWriter);
+
+		sendBarrier(0, System.currentTimeMillis(), SAVEPOINT, gate, 0); // using AC because UC would require ordering in gate while polling
+		((RemoteInputChannel) gate.getChannel(0)).onBuffer(createBuffer(1024), 1, 0); // to be captured
+		send(toBuffer(new CheckpointBarrier(1, System.currentTimeMillis(), unaligned(getDefault())), true), 1, gate);
+
+		assertFalse(stateWriter.getAddedInput().isEmpty());
+	}
+
 	@Test
 	public void testCheckpointHandling() throws Exception {
 		testBarrierHandling(CHECKPOINT);


[flink] 18/18: [FLINK-19681][network] Force priority for converted barriers

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd6967c7259de29e04fbcb6c5e31fa483f98faf6
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Dec 3 21:57:48 2020 +0100

    [FLINK-19681][network] Force priority for converted barriers
    
    Without this, gate interprets barrier as outdated
    because it has already seen its SQN during the announcement.
    
    Preventing announcements from updating gate lastSeenSqn
    doesn't work because it provokes concurrency issue with
    notification (by efficitively disable lastSeenSqn guard).
---
 .../partition/consumer/RemoteInputChannel.java     | 13 ++++++++++-
 .../partition/consumer/SingleInputGate.java        | 16 ++++++++-----
 .../partition/consumer/RemoteInputChannelTest.java | 26 +++++++++++++++++++++-
 3 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index c012766..e765b17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -565,14 +565,25 @@ public class RemoteInputChannel extends InputChannel {
 				"Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]",
 				toPrioritize,
 				numPriorityElementsBeforeRemoval);
+			// set the priority flag (checked on poll)
+			// don't convert the barrier itself (barrier controller might not have been switched yet)
+			AbstractEvent e = EventSerializer.fromBuffer(toPrioritize.buffer, this.getClass().getClassLoader());
+			toPrioritize.buffer.setReaderIndex(0);
+			toPrioritize = new SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
 			firstPriorityEvent = addPriorityBuffer(toPrioritize); 	// note that only position of the element is changed
 																	// converting the event itself would require switching the controller sooner
 		}
 		if (firstPriorityEvent) {
-			notifyPriorityEvent(sequenceNumber);
+			notifyPriorityEventForce(); // forcibly notify about the priority event
+										// instead of passing barrier SQN to be checked
+										// because this SQN might have be seen by the input gate during the announcement
 		}
 	}
 
+	private void notifyPriorityEventForce() {
+		inputGate.notifyPriorityEventForce(this);
+	}
+
 	/**
 	 * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index fa4699c..f8158ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -773,7 +773,7 @@ public class SingleInputGate extends IndexedInputGate {
 	// ------------------------------------------------------------------------
 
 	void notifyChannelNonEmpty(InputChannel channel) {
-		queueChannel(checkNotNull(channel), null);
+		queueChannel(checkNotNull(channel), null, false);
 	}
 
 	/**
@@ -782,10 +782,14 @@ public class SingleInputGate extends IndexedInputGate {
 	 * <p>The buffer number limits the notification to the respective buffer and voids the whole notification in case
 	 * that the buffer has been polled in the meantime. That is, if task thread polls the enqueued priority buffer
 	 * before this notification occurs (notification is not performed under lock), this buffer number allows
-	 * {@link #queueChannel(InputChannel, Integer)} to avoid spurious priority wake-ups.
+	 * {@link #queueChannel(InputChannel, Integer, boolean)} to avoid spurious priority wake-ups.
 	 */
 	void notifyPriorityEvent(InputChannel inputChannel, int prioritySequenceNumber) {
-		queueChannel(checkNotNull(inputChannel), prioritySequenceNumber);
+		queueChannel(checkNotNull(inputChannel), prioritySequenceNumber, false);
+	}
+
+	void notifyPriorityEventForce(InputChannel inputChannel) {
+		queueChannel(checkNotNull(inputChannel), null, true);
 	}
 
 	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
@@ -805,12 +809,12 @@ public class SingleInputGate extends IndexedInputGate {
 			}));
 	}
 
-	private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) {
+	private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber, boolean forcePriority) {
 		try (GateNotificationHelper notification = new GateNotificationHelper(this, inputChannelsWithData)) {
 			synchronized (inputChannelsWithData) {
-				boolean priority = prioritySequenceNumber != null;
+				boolean priority = prioritySequenceNumber != null || forcePriority;
 
-				if (priority &&
+				if (!forcePriority && priority &&
 						isOutdated(prioritySequenceNumber, lastPrioritySequenceNumber[channel.getChannelIndex()])) {
 					// priority event at the given offset already polled (notification is not atomic in respect to
 					// buffer enqueuing), so just ignore the notification
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index d13cb72..08358da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -75,6 +75,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedWithTimeout;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -109,7 +110,30 @@ public class RemoteInputChannelTest {
 
 	private static final long CHECKPOINT_ID = 1L;
 	private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned(getDefault());
-	private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout(getDefault(), 10);
+	private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = alignedWithTimeout(getDefault(), 10);
+
+	@Test
+	public void testGateNotifiedOnBarrierConversion() throws IOException, InterruptedException {
+		final int sequenceNumber = 0;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 4096);
+		try {
+			SingleInputGate inputGate = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(1, 1)).build();
+			inputGate.setup();
+			RemoteInputChannel channel = InputChannelBuilder.newBuilder()
+				.setConnectionManager(new TestVerifyConnectionManager(new TestVerifyPartitionRequestClient()))
+				.buildRemoteChannel(inputGate);
+			channel.requestSubpartition(0);
+
+			channel.onBuffer(toBuffer(new CheckpointBarrier(1L, 123L, alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), false), sequenceNumber, 0);
+			inputGate.pollNext(); // process announcement to allow the gate remember the SQN
+
+			channel.convertToPriorityEvent(sequenceNumber);
+			assertTrue(inputGate.getPriorityEventAvailableFuture().isDone());
+
+		} finally {
+			networkBufferPool.destroy();
+		}
+	}
 
 	@Test
 	public void testExceptionOnReordering() throws Exception {


[flink] 04/18: [FLINK-19681][checkpointing] Timeout aligned checkpoints based on checkpointStartDelay

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5c46abbcd8662f7162f934b16209ef3a60585a9
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sun Oct 25 18:04:41 2020 +0100

    [FLINK-19681][checkpointing] Timeout aligned checkpoints based on checkpointStartDelay
---
 .../runtime/checkpoint/CheckpointOptions.java      |  11 +
 .../partition/BufferWritingResultPartition.java    |   3 +
 .../io/network/partition/PrioritizedDeque.java     |  22 ++
 .../partition/consumer/ChannelStatePersister.java  |  11 +-
 .../partition/consumer/CheckpointableInput.java    |   2 +
 .../partition/consumer/IndexedInputGate.java       |   7 +
 .../network/partition/consumer/InputChannel.java   |   3 +
 .../partition/consumer/RemoteInputChannel.java     |  47 ++++
 .../io/network/partition/PrioritizedDequeTest.java |  27 ++
 .../partition/consumer/RemoteInputChannelTest.java |  41 +++
 .../streaming/runtime/io/AlignedController.java    |  54 +++-
 .../runtime/io/AlternatingController.java          | 125 ++++++++-
 .../io/CheckpointBarrierBehaviourController.java   |  12 +-
 .../runtime/io/SingleCheckpointBarrierHandler.java |  70 +++--
 .../runtime/io/StreamTaskSourceInput.java          |   5 +
 .../streaming/runtime/io/UnalignedController.java  |  22 +-
 .../runtime/tasks/mailbox/MailboxProcessor.java    |   5 +
 .../runtime/io/AlignedControllerTest.java          |  29 ++
 .../runtime/io/AlternatingControllerTest.java      | 306 ++++++++++++++++++++-
 .../runtime/io/ValidatingCheckpointHandler.java    |   8 +-
 20 files changed, 754 insertions(+), 56 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 126ba0b..b092a92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Options for performing the checkpoint.
@@ -188,4 +189,14 @@ public class CheckpointOptions implements Serializable {
 			isUnalignedCheckpoint,
 			alignmentTimeout);
 	}
+
+	public CheckpointOptions toTimeouted() {
+		checkState(checkpointType == CheckpointType.CHECKPOINT);
+		return create(
+			checkpointType,
+			targetLocation,
+			isExactlyOnceMode,
+			true,
+			0);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 2ba000a..415ed19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -235,6 +235,9 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
 	private BufferBuilder appendUnicastDataForNewRecord(
 			final ByteBuffer record,
 			final int targetSubpartition) throws IOException {
+		if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) {
+			throw new ArrayIndexOutOfBoundsException(targetSubpartition);
+		}
 		BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
 
 		if (buffer == null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
index dabb4a3..77108d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
@@ -26,7 +26,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -142,6 +144,26 @@ public final class PrioritizedDeque<T> implements Iterable<T> {
 	}
 
 	/**
+	 * Find first element matching the {@link Predicate}, remove it from the {@link PrioritizedDeque}
+	 * and return it.
+	 * @return removed element
+	 */
+	public T getAndRemove(Predicate<T> preCondition) {
+		Iterator<T> iterator = deque.iterator();
+		for (int i = 0; i < deque.size(); i++) {
+			T next = iterator.next();
+			if (preCondition.test(next)) {
+				if (i < numPriorityElements) {
+					numPriorityElements--;
+				}
+				iterator.remove();
+				return next;
+			}
+		}
+		throw new NoSuchElementException();
+	}
+
+	/**
 	 * Polls the first priority element or non-priority element if the former does not exist.
 	 *
 	 * @return the first element or null.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
index 416c956..924e30f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.CloseableIterator;
@@ -47,9 +48,7 @@ final class ChannelStatePersister {
 
 	private long lastSeenBarrier = -1L;
 
-	/**
-	 * Writer must be initialized before usage. {@link #startPersisting(long, List)} enforces this invariant.
-	 */
+	/** Writer must be initialized before usage. {@link #startPersisting(long, List)} enforces this invariant. */
 	private final ChannelStateWriter channelStateWriter;
 
 	ChannelStatePersister(ChannelStateWriter channelStateWriter, InputChannelInfo channelInfo) {
@@ -97,6 +96,12 @@ final class ChannelStatePersister {
 				return Optional.of(lastSeenBarrier);
 			}
 		}
+		else if (priorityEvent instanceof EventAnnouncement) {
+			EventAnnouncement announcement = (EventAnnouncement) priorityEvent;
+			if (announcement.getAnnouncedEvent() instanceof CheckpointBarrier) {
+				return Optional.of(((CheckpointBarrier) announcement.getAnnouncedEvent()).getId());
+			}
+		}
 		return Optional.empty();
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
index 2316a90..75a2ba6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
@@ -44,4 +44,6 @@ public interface CheckpointableInput {
 	void checkpointStopped(long cancelledCheckpointId);
 
 	int getInputGateIndex();
+
+	void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 3b8ecb2..6566fd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -21,6 +21,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
+import java.io.IOException;
+
 /**
  * An {@link InputGate} with a specific index.
  */
@@ -53,4 +55,9 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab
 	public void blockConsumption(InputChannelInfo channelInfo) {
 		// Unused. Network stack is blocking consumption automatically by revoking credits.
 	}
+
+	@Override
+	public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException {
+		getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 792392b9..e1909b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -183,6 +183,9 @@ public abstract class InputChannel {
 	public void checkpointStopped(long checkpointId) {
 	}
 
+	public void convertToPriorityEvent(int sequenceNumber) throws IOException {
+	}
+
 	// ------------------------------------------------------------------------
 	// Task events
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 1992cdb..27fa58f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -494,6 +494,7 @@ public class RemoteInputChannel extends InputChannel {
 
 	private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws IOException {
 		checkState(!sequenceBuffer.buffer.isBuffer(), "Only a CheckpointBarrier can be announced but found %s", sequenceBuffer.buffer);
+		checkAnnouncedOnlyOnce(sequenceBuffer);
 		AbstractEvent event = EventSerializer.fromBuffer(
 				sequenceBuffer.buffer,
 				getClass().getClassLoader());
@@ -504,6 +505,20 @@ public class RemoteInputChannel extends InputChannel {
 				sequenceBuffer.sequenceNumber);
 	}
 
+	private void checkAnnouncedOnlyOnce(SequenceBuffer sequenceBuffer) {
+		Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
+		int count = 0;
+		while (iterator.hasNext()) {
+			if (iterator.next().sequenceNumber == sequenceBuffer.sequenceNumber) {
+				count++;
+			}
+		}
+		checkState(
+			count == 1,
+			"Before enqueuing the announcement there should be exactly single occurrence of the buffer, but found [%d]",
+			count);
+	}
+
 	/**
 	 * Spills all queued buffers on checkpoint start. If barrier has already been received (and reordered), spill only
 	 * the overtaken buffers.
@@ -533,6 +548,29 @@ public class RemoteInputChannel extends InputChannel {
 		}
 	}
 
+	@Override
+	public void convertToPriorityEvent(int sequenceNumber) throws IOException {
+		boolean firstPriorityEvent;
+		synchronized (receivedBuffers) {
+			checkState(!channelStatePersister.hasBarrierReceived());
+			int numPriorityElementsBeforeRemoval = receivedBuffers.getNumPriorityElements();
+			SequenceBuffer toPrioritize = receivedBuffers.getAndRemove(
+				sequenceBuffer -> sequenceBuffer.sequenceNumber == sequenceNumber);
+			checkState(lastBarrierSequenceNumber == sequenceNumber);
+			checkState(!toPrioritize.buffer.isBuffer());
+			checkState(
+				numPriorityElementsBeforeRemoval == receivedBuffers.getNumPriorityElements(),
+				"Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]",
+				toPrioritize,
+				numPriorityElementsBeforeRemoval);
+			firstPriorityEvent = addPriorityBuffer(toPrioritize); 	// note that only position of the element is changed
+																	// converting the event itself would require switching the controller sooner
+		}
+		if (firstPriorityEvent) {
+			notifyPriorityEvent(sequenceNumber);
+		}
+	}
+
 	/**
 	 * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events.
 	 */
@@ -653,5 +691,14 @@ public class RemoteInputChannel extends InputChannel {
 			this.buffer = buffer;
 			this.sequenceNumber = sequenceNumber;
 		}
+
+		@Override
+		public String toString() {
+			return String.format(
+				"SequenceBuffer(isEvent = %s, dataType = %s, sequenceNumber = %s)",
+				!buffer.isBuffer(),
+				buffer.getDataType(),
+				sequenceNumber);
+		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
index 8932c86..5cc8a3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
@@ -19,7 +19,11 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.junit.Test;
 
+import java.util.NoSuchElementException;
+
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests PrioritizedDeque.
@@ -53,4 +57,27 @@ public class PrioritizedDequeTest {
 
 		assertArrayEquals(new Integer[] { 3, 0, 1, 2 }, deque.asUnmodifiableCollection().toArray(new Integer[0]));
 	}
+
+	@Test
+	public void testGetAndRemove() {
+		final PrioritizedDeque<Integer> deque = new PrioritizedDeque<>();
+
+		deque.add(0);
+		deque.add(1);
+		deque.add(2);
+		deque.add(1);
+		deque.add(3);
+
+		assertEquals(1, deque.getAndRemove(v -> v == 1).intValue());
+		assertArrayEquals(new Integer[] { 0, 2, 1, 3 }, deque.asUnmodifiableCollection().toArray(new Integer[0]));
+		assertEquals(1, deque.getAndRemove(v -> v == 1).intValue());
+		assertArrayEquals(new Integer[] { 0, 2, 3 }, deque.asUnmodifiableCollection().toArray(new Integer[0]));
+		try {
+			int removed = deque.getAndRemove(v -> v == 1);
+			fail(String.format("This should not happen. Item [%s] was removed, but it shouldn't be found", removed));
+		}
+		catch (NoSuchElementException ex) {
+			// expected
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index c74cf4e..2954a3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1232,6 +1232,47 @@ public class RemoteInputChannelTest {
 		assertEquals(3, channel.getNextBuffer().get().getSequenceNumber());
 	}
 
+	@Test
+	public void testGetInflightBuffersBeforeProcessingAnnouncement() throws Exception {
+		int bufferSize = 1;
+		int sequenceNumber = 0;
+		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBarrier(channel, sequenceNumber++, 10);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		assertInflightBufferSizes(channel, 1, 2);
+	}
+
+	@Test
+	public void testGetInflightBuffersAfterProcessingAnnouncement() throws Exception {
+		int bufferSize = 1;
+		int sequenceNumber = 0;
+		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBarrier(channel, sequenceNumber++, 10);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		assertGetNextBufferSequenceNumbers(channel, 2);
+		assertInflightBufferSizes(channel, 1, 2);
+	}
+
+	@Test
+	public void testGetInflightBuffersAfterProcessingAnnouncementAndBuffer() throws Exception {
+		int bufferSize = 1;
+		int sequenceNumber = 0;
+		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBarrier(channel, sequenceNumber++, 10);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		sendBuffer(channel, sequenceNumber++, bufferSize++);
+		assertGetNextBufferSequenceNumbers(channel, 2, 0);
+		assertInflightBufferSizes(channel, 2);
+	}
+
 	private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, int alignmentTimeout) throws IOException {
 		CheckpointOptions checkpointOptions = CheckpointOptions.create(
 			CHECKPOINT,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index f3999a4..a2ef674 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -26,7 +26,10 @@ import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -39,13 +42,35 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class AlignedController implements CheckpointBarrierBehaviourController {
 	private final CheckpointableInput[] inputs;
 
+	/**
+	 * {@link #blockedChannels} are the ones for which we have already processed {@link CheckpointBarrier}
+	 * (via {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)}. {@link #sequenceNumberInAnnouncedChannels}
+	 * on the other hand, are the ones that we have processed {@link #barrierAnnouncement(InputChannelInfo, CheckpointBarrier, int)}
+	 * but not yet {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)}.
+	 */
 	private final Map<InputChannelInfo, Boolean> blockedChannels;
+	private final Map<InputChannelInfo, Integer> sequenceNumberInAnnouncedChannels;
 
 	public AlignedController(CheckpointableInput... inputs) {
 		this.inputs = inputs;
 		blockedChannels = Arrays.stream(inputs)
 			.flatMap(gate -> gate.getChannelInfos().stream())
 			.collect(Collectors.toMap(Function.identity(), info -> false));
+		sequenceNumberInAnnouncedChannels = new HashMap<>();
+	}
+
+	@Override
+	public void barrierAnnouncement(
+			InputChannelInfo channelInfo,
+			CheckpointBarrier announcedBarrier,
+			int sequenceNumber) {
+		Integer previousValue = sequenceNumberInAnnouncedChannels.put(channelInfo, sequenceNumber);
+		checkState(
+			previousValue == null,
+			"Stream corrupt: Repeated barrierAnnouncement [%s] overwriting [%s] for the same checkpoint on input %s",
+			announcedBarrier,
+			sequenceNumber,
+			channelInfo);
 	}
 
 	@Override
@@ -53,27 +78,29 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 	}
 
 	@Override
-	public void barrierReceived(
+	public Optional<CheckpointBarrier> barrierReceived(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) {
 		checkState(!blockedChannels.put(channelInfo, true), "Stream corrupt: Repeated barrier for same checkpoint on input " + channelInfo);
+		sequenceNumberInAnnouncedChannels.remove(channelInfo);
 		CheckpointableInput input = inputs[channelInfo.getGateIdx()];
 		input.blockConsumption(channelInfo);
+		return Optional.empty();
 	}
 
 	@Override
-	public boolean preProcessFirstBarrier(
+	public Optional<CheckpointBarrier> preProcessFirstBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) {
-		return false;
+		return Optional.empty();
 	}
 
 	@Override
-	public boolean postProcessLastBarrier(
+	public Optional<CheckpointBarrier> postProcessLastBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException {
 		resumeConsumption();
-		return true;
+		return Optional.of(barrier);
 	}
 
 	@Override
@@ -90,16 +117,29 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 		resumeConsumption(channelInfo);
 	}
 
-	private void resumeConsumption() throws IOException {
+	public Collection<InputChannelInfo> getBlockedChannels() {
+		return blockedChannels.entrySet()
+			.stream()
+			.filter(entry -> entry.getValue())
+			.map(entry -> entry.getKey())
+			.collect(Collectors.toSet());
+	}
+
+	public Map<InputChannelInfo, Integer> getSequenceNumberInAnnouncedChannels() {
+		return new HashMap<>(sequenceNumberInAnnouncedChannels);
+	}
+
+	public void resumeConsumption() throws IOException {
 		for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : blockedChannels.entrySet()) {
 			if (blockedChannel.getValue()) {
 				resumeConsumption(blockedChannel.getKey());
 			}
 			blockedChannel.setValue(false);
 		}
+		sequenceNumberInAnnouncedChannels.clear();
 	}
 
-	private void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
+	void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
 		CheckpointableInput input = inputs[channelInfo.getGateIdx()];
 		input.resumeConsumption(channelInfo);
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index d040c66..f943638 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -20,10 +20,14 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -34,7 +38,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class AlternatingController implements CheckpointBarrierBehaviourController {
 	private final AlignedController alignedController;
 	private final UnalignedController unalignedController;
-	private  CheckpointBarrierBehaviourController activeController;
+
+	private CheckpointBarrierBehaviourController activeController;
+	private long timeOutedBarrierId = -1; // used to shortcut timeout check
 
 	public AlternatingController(
 			AlignedController alignedController,
@@ -49,23 +55,107 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	}
 
 	@Override
-	public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
-		checkActiveController(barrier);
-		activeController.barrierReceived(channelInfo, barrier);
+	public void barrierAnnouncement(
+			InputChannelInfo channelInfo,
+			CheckpointBarrier announcedBarrier,
+			int sequenceNumber) throws IOException {
+
+		Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(announcedBarrier);
+		announcedBarrier = maybeTimedOut.orElse(announcedBarrier);
+
+		if (maybeTimedOut.isPresent() && activeController != unalignedController) {
+			// Let's timeout this barrier
+			unalignedController.barrierAnnouncement(channelInfo, announcedBarrier, sequenceNumber);
+		}
+		else {
+			// Either we have already timed out before, or we are still going with aligned checkpoints
+			activeController.barrierAnnouncement(channelInfo, announcedBarrier, sequenceNumber);
+		}
+	}
+
+	@Override
+	public Optional<CheckpointBarrier> barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
+		if (barrier.getCheckpointOptions().isUnalignedCheckpoint() && activeController == alignedController) {
+			switchToUnaligned(channelInfo, barrier);
+			activeController.barrierReceived(channelInfo, barrier);
+			return Optional.of(barrier);
+		}
+
+		Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(barrier);
+		barrier = maybeTimedOut.orElse(barrier);
+
+		checkState(!activeController.barrierReceived(channelInfo, barrier).isPresent());
+
+		if (maybeTimedOut.isPresent()) {
+			if (activeController == alignedController) {
+				switchToUnaligned(channelInfo, barrier);
+				return maybeTimedOut;
+			}
+			else {
+				// TODO: add unit test for this
+				alignedController.resumeConsumption(channelInfo);
+			}
+		}
+		return Optional.empty();
 	}
 
 	@Override
-	public boolean preProcessFirstBarrier(
+	public Optional<CheckpointBarrier> preProcessFirstBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException, CheckpointException {
-		checkActiveController(barrier);
 		return activeController.preProcessFirstBarrier(channelInfo, barrier);
 	}
 
+	private void switchToUnaligned(
+			InputChannelInfo channelInfo,
+			CheckpointBarrier barrier) throws IOException, CheckpointException {
+		checkState(alignedController == activeController);
+
+		// timeout all not yet processed barriers for which alignedController has processed an announcement
+		for (Map.Entry<InputChannelInfo, Integer> entry : alignedController.getSequenceNumberInAnnouncedChannels().entrySet()) {
+			InputChannelInfo unProcessedChannelInfo = entry.getKey();
+			int announcedBarrierSequenceNumber = entry.getValue();
+			unalignedController.barrierAnnouncement(unProcessedChannelInfo, barrier, announcedBarrierSequenceNumber);
+		}
+
+		// get blocked channels before resuming consumption
+		Collection<InputChannelInfo> blockedChannels = alignedController.getBlockedChannels();
+
+		activeController = unalignedController;
+
+		// alignedController might has already processed some barriers, so "migrate"/forward those calls to unalignedController.
+		unalignedController.preProcessFirstBarrier(channelInfo, barrier);
+		for (InputChannelInfo blockedChannel : blockedChannels) {
+			unalignedController.barrierReceived(blockedChannel, barrier);
+		}
+
+		alignedController.resumeConsumption();
+	}
+
 	@Override
-	public boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException {
-		checkActiveController(barrier);
-		return activeController.postProcessLastBarrier(channelInfo, barrier);
+	public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
+		Optional<CheckpointBarrier> maybeTimeOut = maybeTimeOut(barrier);
+		if (maybeTimeOut.isPresent() && activeController == alignedController) {
+			switchToUnaligned(channelInfo, maybeTimeOut.get());
+			checkState(activeController == unalignedController);
+			checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent());
+			return maybeTimeOut;
+		}
+
+		barrier = maybeTimeOut.orElse(barrier);
+		if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
+			checkState(activeController == unalignedController);
+			checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent());
+			return Optional.empty();
+		}
+		else {
+			checkState(activeController == alignedController);
+			Optional<CheckpointBarrier> triggerResult = activeController.postProcessLastBarrier(
+				channelInfo,
+				barrier);
+			checkState(triggerResult.isPresent());
+			return triggerResult;
+		}
 	}
 
 	@Override
@@ -94,4 +184,21 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	private CheckpointBarrierBehaviourController chooseController(CheckpointBarrier barrier) {
 		return isAligned(barrier) ? alignedController : unalignedController;
 	}
+
+	private Optional<CheckpointBarrier> maybeTimeOut(CheckpointBarrier barrier) {
+		CheckpointOptions options = barrier.getCheckpointOptions();
+		boolean shouldTimeout = (options.isTimeoutable()) && (
+			barrier.getId() == timeOutedBarrierId ||
+				(System.currentTimeMillis() - barrier.getTimestamp()) > options.getAlignmentTimeout());
+		if (options.isUnalignedCheckpoint() || !shouldTimeout) {
+			return Optional.empty();
+		}
+		else {
+			timeOutedBarrierId = Math.max(timeOutedBarrierId, barrier.getId());
+			return Optional.of(new CheckpointBarrier(
+				barrier.getId(),
+				barrier.getTimestamp(),
+				options.toTimeouted()));
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
index 5939451..2a3908a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Controls when the checkpoint should be actually triggered.
@@ -37,23 +38,28 @@ public interface CheckpointBarrierBehaviourController {
 	void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier);
 
 	/**
+	 * Invoked per every {@link CheckpointBarrier} announcement.
+	 */
+	void barrierAnnouncement(InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, int sequenceNumber) throws IOException;
+
+	/**
 	 * Invoked per every received {@link CheckpointBarrier}.
 	 */
-	void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier);
+	Optional<CheckpointBarrier> barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
 
 	/**
 	 * Invoked once per checkpoint, before the first invocation of
 	 * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for that given checkpoint.
 	 * @return {@code true} if checkpoint should be triggered.
 	 */
-	boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
+	Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
 
 	/**
 	 * Invoked once per checkpoint, after the last invocation of
 	 * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for that given checkpoint.
 	 * @return {@code true} if checkpoint should be triggered.
 	 */
-	boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException;
+	Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
 
 	void abortPendingCheckpoint(long cancelledId, CheckpointException exception) throws IOException;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
index fa1ee00..b00b1fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
+import org.apache.flink.util.function.TriFunctionWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
@@ -116,21 +118,15 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 				markAlignmentStart(barrier.getTimestamp());
 			}
 			allBarriersReceivedFuture = new CompletableFuture<>();
-			try {
-				if (controller.preProcessFirstBarrier(channelInfo, barrier)) {
-					LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.",
-						taskName,
-						barrier.getId(),
-						barrier.getTimestamp());
-					notifyCheckpoint(barrier);
-				}
-			} catch (CheckpointException e) {
-				abortInternal(barrier.getId(), e);
+
+			if (!handleBarrier(barrier, channelInfo, CheckpointBarrierBehaviourController::preProcessFirstBarrier)) {
 				return;
 			}
 		}
 
-		controller.barrierReceived(channelInfo, barrier);
+		if (!handleBarrier(barrier, channelInfo, CheckpointBarrierBehaviourController::barrierReceived)) {
+			return;
+		}
 
 		if (currentCheckpointId == barrierId) {
 			if (++numBarriersReceived == numOpenChannels) {
@@ -139,25 +135,61 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 				}
 				numBarriersReceived = 0;
 				lastCancelledOrCompletedCheckpointId = currentCheckpointId;
-				if (controller.postProcessLastBarrier(channelInfo, barrier)) {
-					LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.",
-						taskName,
-						barrier.getId(),
-						barrier.getTimestamp());
-					notifyCheckpoint(barrier);
-				}
+				handleBarrier(barrier, channelInfo, CheckpointBarrierBehaviourController::postProcessLastBarrier);
 				allBarriersReceivedFuture.complete(null);
 			}
 		}
 	}
 
+	private boolean handleBarrier(
+			CheckpointBarrier barrier,
+			InputChannelInfo channelInfo,
+			TriFunctionWithException<
+				CheckpointBarrierBehaviourController,
+				InputChannelInfo,
+				CheckpointBarrier,
+				Optional<CheckpointBarrier>,
+				Exception
+			> controllerAction) throws IOException {
+		try {
+			Optional<CheckpointBarrier> triggerMaybe = controllerAction.apply(controller, channelInfo, barrier);
+			if (triggerMaybe.isPresent()) {
+				CheckpointBarrier trigger = triggerMaybe.get();
+				LOG.debug(
+					"{}: Triggering checkpoint {} on the barrier announcement at {}.",
+					taskName,
+					trigger.getId(),
+					trigger.getTimestamp());
+				notifyCheckpoint(trigger);
+			}
+			return true;
+		} catch (CheckpointException e) {
+			abortInternal(barrier.getId(), e);
+			return false;
+		} catch (RuntimeException | IOException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
+	}
+
 	@Override
 	public void processBarrierAnnouncement(
 			CheckpointBarrier announcedBarrier,
 			int sequenceNumber,
 			InputChannelInfo channelInfo) throws IOException {
 		checkSubsumedCheckpoint(channelInfo, announcedBarrier);
-		// TODO: FLINK-19681
+
+		long barrierId = announcedBarrier.getId();
+		if (currentCheckpointId > barrierId || (currentCheckpointId == barrierId && !isCheckpointPending())) {
+			LOG.debug("{}: Obsolete announcement of checkpoint {} for channel {}.",
+					taskName,
+					barrierId,
+					channelInfo);
+			return;
+		}
+
+		controller.barrierAnnouncement(channelInfo, announcedBarrier, sequenceNumber);
 	}
 
 	private void checkSubsumedCheckpoint(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
index 9b7c80f..01bd056 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -128,6 +129,10 @@ public class StreamTaskSourceInput<T> implements StreamTaskInput<T>, Checkpointa
 	}
 
 	@Override
+	public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException {
+	}
+
+	@Override
 	public int getInputIndex() {
 		return inputIndex;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
index 3d5f575..d19533e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Controller for unaligned checkpoints.
@@ -48,22 +49,33 @@ public class UnalignedController implements CheckpointBarrierBehaviourController
 	}
 
 	@Override
-	public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
+	public void barrierAnnouncement(
+			InputChannelInfo channelInfo,
+			CheckpointBarrier announcedBarrier,
+			int sequenceNumber) throws IOException {
+		inputs[channelInfo.getGateIdx()].convertToPriorityEvent(
+			channelInfo.getInputChannelIdx(),
+			sequenceNumber);
+	}
+
+	@Override
+	public Optional<CheckpointBarrier> barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
+		return Optional.empty();
 	}
 
 	@Override
-	public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
+	public Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
 		checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions());
 		for (final CheckpointableInput input : inputs) {
 			input.checkpointStarted(barrier);
 		}
-		return true;
+		return Optional.of(barrier);
 	}
 
 	@Override
-	public boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
+	public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
 		resetPendingCheckpoint(barrier.getId());
-		return false;
+		return Optional.empty();
 	}
 
 	private void resetPendingCheckpoint(long cancelledId) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index 008ea9f..41b297d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -86,6 +86,11 @@ public class MailboxProcessor implements Closeable {
 
 	private Meter idleTime = new MeterView(new SimpleCounter());
 
+	@VisibleForTesting
+	public MailboxProcessor() {
+		this(MailboxDefaultAction.Controller::suspendDefaultAction);
+	}
+
 	public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
 		this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE);
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
index f2bd319..b9eac08 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
+import org.hamcrest.collection.IsMapContaining;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,11 +51,16 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.streaming.runtime.io.UnalignedControllerTest.addSequence;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -143,6 +149,29 @@ public class AlignedControllerTest {
 	//  Tests
 	// ------------------------------------------------------------------------
 
+	public void testGetChannelsWithUnprocessedBarriers() throws IOException {
+		mockInputGate = new MockInputGate(4, Collections.emptyList());
+		AlignedController alignedController = new AlignedController(mockInputGate);
+		BufferOrEvent barrier0 = createBarrier(1, 0);
+		BufferOrEvent barrier1 = createBarrier(1, 1);
+		BufferOrEvent barrier3 = createBarrier(1, 3);
+		alignedController.barrierAnnouncement(barrier0.getChannelInfo(), (CheckpointBarrier) barrier0.getEvent(), 0);
+		alignedController.barrierReceived(barrier0.getChannelInfo(), (CheckpointBarrier) barrier0.getEvent());
+		alignedController.barrierAnnouncement(barrier1.getChannelInfo(), (CheckpointBarrier) barrier1.getEvent(), 1);
+		alignedController.barrierAnnouncement(barrier3.getChannelInfo(), (CheckpointBarrier) barrier3.getEvent(), 42);
+
+		Collection<InputChannelInfo> blockedChannels = alignedController.getBlockedChannels();
+		Map<InputChannelInfo, Integer> announcedChannels = alignedController.getSequenceNumberInAnnouncedChannels();
+
+		// blockedChannels and announcedChannels should be copies and shouldn't be cleared by the resumeConsumption
+		alignedController.resumeConsumption();
+
+		assertThat(blockedChannels, contains(barrier0.getChannelInfo()));
+		assertThat(announcedChannels, IsMapContaining.hasEntry(barrier1.getChannelInfo(), 1));
+		assertThat(announcedChannels, IsMapContaining.hasEntry(barrier3.getChannelInfo(), 42));
+		assertThat(announcedChannels.size(), equalTo(2));
+	}
+
 	/**
 	 * Validates that the buffer behaves correctly if no checkpoint barriers come,
 	 * for a single input channel.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index db958f3..8d81df2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -19,11 +19,18 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
@@ -32,12 +39,14 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Collections.singletonList;
 import static junit.framework.TestCase.assertTrue;
@@ -45,8 +54,11 @@ import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -84,7 +96,7 @@ public class AlternatingControllerTest {
 	}
 
 	@Test
-	public void testAlignedTimeoutableCheckpoint() throws Exception {
+	public void testAlignedNeverTimeoutableCheckpoint() throws Exception {
 		int numChannels = 2;
 		int bufferSize = 1000;
 		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
@@ -95,14 +107,248 @@ public class AlternatingControllerTest {
 		Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE);
 		send(neverTimeoutableCheckpoint, gate, 0);
 		sendBuffer(bufferSize, gate, 1);
-
 		assertEquals(0, target.getTriggeredCheckpointCounter());
 
 		send(neverTimeoutableCheckpoint, gate, 1);
+		assertEquals(1, target.getTriggeredCheckpointCounter());
+	}
+
+	@Test
+	public void testTimeoutAlignment() throws Exception {
+		int numChannels = 2;
+		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels);
+
+		testTimeoutBarrierOnTwoChannels(target, gate);
+	}
+
+	@Test
+	public void testTimeoutAlignmentAfterProcessingBarrier() throws Exception {
+		int numChannels = 3;
+		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels);
 
+		long alignmentTimeout = 10;
+		long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout;
+		Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE);
+
+		RemoteInputChannel channel2 = (RemoteInputChannel) gate.getChannel(2);
+
+		channel2.onBuffer(neverTimeoutableCheckpoint.retainBuffer(), 0, 0);
+		while (gate.pollNext().isPresent()) {
+		}
+
+		assertEquals(0, target.getTriggeredCheckpointCounter());
+
+		testTimeoutBarrierOnTwoChannels(target, gate);
+	}
+
+	private void testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, CheckpointedInputGate gate) throws Exception {
+		int bufferSize = 1000;
+		long alignmentTimeout = 10;
+		long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout;
+		Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout);
+		Buffer buffer = TestBufferFactory.createBuffer(bufferSize);
+
+		RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0);
+		RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1);
+		channel0.onBuffer(buffer.retainBuffer(), 0, 0);
+		channel0.onBuffer(buffer.retainBuffer(), 1, 0);
+		channel0.onBuffer(checkpointBarrier.retainBuffer(), 2, 0);
+		channel1.onBuffer(buffer.retainBuffer(), 0, 0);
+		channel1.onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
+
+		assertEquals(0, target.getTriggeredCheckpointCounter());
+		// First announcements and prioritsed barriers
+		List<AbstractEvent> events = new ArrayList<>();
+		events.add(gate.pollNext().get().getEvent());
+		events.add(gate.pollNext().get().getEvent());
+		events.add(gate.pollNext().get().getEvent());
+		events.add(gate.pollNext().get().getEvent());
+		assertThat(events, containsInAnyOrder(
+			instanceOf(EventAnnouncement.class),
+			instanceOf(EventAnnouncement.class),
+			instanceOf(CheckpointBarrier.class),
+			instanceOf(CheckpointBarrier.class)));
+		assertEquals(1, target.getTriggeredCheckpointCounter());
+		assertThat(
+			target.getTriggeredCheckpointOptions(),
+			contains(CheckpointOptions.create(
+				CHECKPOINT,
+				CheckpointStorageLocationReference.getDefault(),
+				true,
+				true,
+				0)));
+		// Followed by overtaken buffers
+		assertFalse(gate.pollNext().get().isEvent());
+		assertFalse(gate.pollNext().get().isEvent());
+		assertFalse(gate.pollNext().get().isEvent());
+	}
+
+	/**
+	 * This test tries to make sure that the first time out happens after processing
+	 * {@link EventAnnouncement} but before/during processing the first {@link CheckpointBarrier}.
+	 */
+	@Test
+	public void testTimeoutAlignmentOnFirstBarrier() throws Exception {
+		int numChannels = 2;
+		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels);
+
+		long alignmentTimeout = 100;
+		long checkpointCreationTime = System.currentTimeMillis();
+		Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout);
+
+		RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0);
+		RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1);
+		channel0.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+		channel1.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+
+		assertEquals(0, target.getTriggeredCheckpointCounter());
+		// First announcements and prioritsed barriers
+		List<AbstractEvent> events = new ArrayList<>();
+		events.add(gate.pollNext().get().getEvent());
+		events.add(gate.pollNext().get().getEvent());
+
+		Thread.sleep(alignmentTimeout * 2);
+
+		events.add(gate.pollNext().get().getEvent());
+		assertThat(events, contains(
+			instanceOf(EventAnnouncement.class),
+			instanceOf(EventAnnouncement.class),
+			instanceOf(CheckpointBarrier.class)));
 		assertEquals(1, target.getTriggeredCheckpointCounter());
 	}
 
+	/**
+	 * First we process aligned {@link CheckpointBarrier} and after that we receive an already unaligned
+	 * {@link CheckpointBarrier}, that has timed out on an upstream task.
+	 */
+	@Test
+	public void testTimeoutAlignmentOnUnalignedCheckpoint() throws Exception {
+		int numChannels = 2;
+		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+		RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
+		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels, channelStateWriter);
+
+		long checkpointCreationTime = System.currentTimeMillis();
+		Buffer alignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, Integer.MAX_VALUE);
+		Buffer unalignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, 0);
+		Buffer buffer = TestBufferFactory.createBuffer(1000);
+
+		RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0);
+		RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1);
+		channel0.onBuffer(alignedCheckpointBarrier.retainBuffer(), 0, 0);
+
+		List<AbstractEvent> events = new ArrayList<>();
+		events.add(gate.pollNext().get().getEvent());
+		events.add(gate.pollNext().get().getEvent());
+
+		assertThat(events, contains(
+			instanceOf(EventAnnouncement.class),
+			instanceOf(CheckpointBarrier.class)));
+
+		channel1.onBuffer(buffer.retainBuffer(), 0, 0);
+		channel1.onBuffer(buffer.retainBuffer(), 1, 0);
+		channel1.onBuffer(unalignedCheckpointBarrier.retainBuffer(), 2, 0);
+
+		events.add(gate.pollNext().get().getEvent());
+
+		assertThat(events, contains(
+			instanceOf(EventAnnouncement.class),
+			instanceOf(CheckpointBarrier.class),
+			instanceOf(CheckpointBarrier.class)));
+
+		assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()).size(), 2);
+		assertEquals(1, target.getTriggeredCheckpointCounter());
+	}
+
+	@Test
+	public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws Exception {
+		testTimeoutAlignmentConsistency(true, false, false);
+	}
+
+	@Test
+	public void testTimeoutAlignmentConsistencyOnProcessBarrier() throws Exception {
+		testTimeoutAlignmentConsistency(false, true, false);
+	}
+
+	@Test
+	public void testTimeoutAlignmentConsistencyOnPostProcessBarrier() throws Exception {
+		testTimeoutAlignmentConsistency(false, false, true);
+	}
+
+	public void testTimeoutAlignmentConsistency(
+			boolean sleepBeforePreProcess,
+			boolean sleepBeforeProcess,
+			boolean sleepBeforePostProcess) throws Exception {
+		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+		SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(1).build();
+		TestInputChannel channel0 = new TestInputChannel(gate, 0, false, true);
+		gate.setInputChannels(channel0);
+
+		RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
+		AlternatingController controller = new AlternatingController(
+			new AlignedController(gate),
+			new UnalignedController(
+				new TestSubtaskCheckpointCoordinator(channelStateWriter),
+				gate));
+
+		long checkpointCreationTime = System.currentTimeMillis();
+		long alignmentTimeout = 10;
+		CheckpointBarrier barrier = checkpointBarrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout);
+
+		InputChannelInfo channelInfo = channel0.getChannelInfo();
+
+		controller.preProcessFirstBarrierOrAnnouncement(barrier);
+		controller.barrierAnnouncement(channelInfo, barrier, 1);
+
+		if (sleepBeforePreProcess) {
+			Thread.sleep(alignmentTimeout * 2);
+		}
+		Optional<CheckpointBarrier> preProcessTrigger = controller.preProcessFirstBarrier(channelInfo, barrier);
+		if (sleepBeforeProcess) {
+			Thread.sleep(alignmentTimeout * 2);
+		}
+		Optional<CheckpointBarrier> processTrigger = controller.barrierReceived(channelInfo, barrier);
+		if (sleepBeforePostProcess) {
+			Thread.sleep(alignmentTimeout * 2);
+		}
+		Optional<CheckpointBarrier> postProcessTrigger = controller.postProcessLastBarrier(channelInfo, barrier);
+
+		int triggeredCount = 0;
+		boolean unalignedCheckpoint = false;
+		if (preProcessTrigger.isPresent()) {
+			triggeredCount++;
+			unalignedCheckpoint = preProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
+			assertTrue(unalignedCheckpoint);
+		}
+		if (processTrigger.isPresent()) {
+			triggeredCount++;
+			unalignedCheckpoint = processTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
+			assertTrue(unalignedCheckpoint);
+		}
+		if (postProcessTrigger.isPresent()) {
+			triggeredCount++;
+			unalignedCheckpoint = postProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
+		}
+
+		assertEquals(
+			String.format(
+				"Checkpoint should be triggered exactly once, but [%s, %s, %s] was found instead",
+				preProcessTrigger.isPresent(),
+				processTrigger.isPresent(),
+				postProcessTrigger.isPresent()),
+			1,
+			triggeredCount);
+
+		if (unalignedCheckpoint) {
+			// check that we can add output data if we are in unaligned checkpoint mode. In other words
+			// if the state writer has been initialised correctly.
+			assertEquals(barrier.getId(), channelStateWriter.getLastStartedCheckpointId());
+		}
+	}
+
 	@Test
 	public void testMetricsAlternation() throws Exception {
 		int numChannels = 2;
@@ -362,6 +608,13 @@ public class AlternatingControllerTest {
 	}
 
 	private static SingleCheckpointBarrierHandler barrierHandler(SingleInputGate inputGate, AbstractInvokable target) {
+		return barrierHandler(inputGate, target, new RecordingChannelStateWriter());
+	}
+
+	private static SingleCheckpointBarrierHandler barrierHandler(
+			SingleInputGate inputGate,
+			AbstractInvokable target,
+			ChannelStateWriter stateWriter) {
 		String taskName = "test";
 		return new SingleCheckpointBarrierHandler(
 			taskName,
@@ -369,7 +622,7 @@ public class AlternatingControllerTest {
 			inputGate.getNumberOfInputChannels(),
 			new AlternatingController(
 				new AlignedController(inputGate),
-				new UnalignedController(TestSubtaskCheckpointCoordinator.INSTANCE, inputGate)));
+				new UnalignedController(new TestSubtaskCheckpointCoordinator(stateWriter), inputGate)));
 	}
 
 	private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException {
@@ -381,18 +634,27 @@ public class AlternatingControllerTest {
 	}
 
 	private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) throws IOException {
+		CheckpointBarrier checkpointBarrier = checkpointBarrier(
+			barrierId,
+			checkpointType,
+			barrierTimestamp,
+			alignmentTimeout);
+		return toBuffer(
+			checkpointBarrier,
+			checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
+	}
+
+	private CheckpointBarrier checkpointBarrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) {
 		CheckpointOptions options = CheckpointOptions.create(
 			checkpointType,
 			CheckpointStorageLocationReference.getDefault(),
 			true,
 			true,
 			alignmentTimeout);
-		return toBuffer(
-			new CheckpointBarrier(
-				barrierId,
-				barrierTimestamp,
-				options),
-			options.isUnalignedCheckpoint());
+		return new CheckpointBarrier(
+			barrierId,
+			barrierTimestamp,
+			options);
 	}
 
 	private static CheckpointedInputGate buildGate(AbstractInvokable target, int numChannels) {
@@ -405,4 +667,30 @@ public class AlternatingControllerTest {
 		return new CheckpointedInputGate(gate, barrierHandler(gate, target), new SyncMailboxExecutor());
 	}
 
+	private static CheckpointedInputGate buildRemoteInputGate(
+			AbstractInvokable target,
+			int numChannels) throws IOException {
+		return buildRemoteInputGate(target, numChannels, new RecordingChannelStateWriter());
+	}
+
+	private static CheckpointedInputGate buildRemoteInputGate(
+			AbstractInvokable target,
+			int numChannels,
+			ChannelStateWriter channelStateWriter) throws IOException {
+		int maxUsedBuffers = 10;
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numChannels * maxUsedBuffers, 4096);
+		SingleInputGate gate = new SingleInputGateBuilder()
+			.setChannelFactory(InputChannelBuilder::buildRemoteChannel)
+			.setNumberOfChannels(numChannels)
+			.setSegmentProvider(networkBufferPool)
+			.setBufferPoolFactory(networkBufferPool.createBufferPool(numChannels, maxUsedBuffers))
+			.setChannelStateWriter(channelStateWriter)
+			.build();
+		gate.setup();
+		gate.requestPartitions();
+		// do not fire events automatically. If you need events, you should expose mailboxProcessor and
+		// execute it step by step
+		MailboxProcessor mailboxProcessor = new MailboxProcessor();
+		return new CheckpointedInputGate(gate, barrierHandler(gate, target, channelStateWriter), mailboxProcessor.getMainMailboxExecutor());
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
index f426289..ce36b39 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
@@ -46,7 +46,8 @@ public class ValidatingCheckpointHandler extends AbstractInvokable {
 	protected long abortedCheckpointCounter = 0;
 	protected CompletableFuture<Long> lastAlignmentDurationNanos;
 	protected CompletableFuture<Long> lastBytesProcessedDuringAlignment;
-	protected List<Long> triggeredCheckpoints = new ArrayList<>();
+	protected final List<Long> triggeredCheckpoints = new ArrayList<>();
+	protected final List<CheckpointOptions> triggeredCheckpointOptions = new ArrayList<>();
 
 	public ValidatingCheckpointHandler() {
 		this(-1);
@@ -89,6 +90,10 @@ public class ValidatingCheckpointHandler extends AbstractInvokable {
 		return lastBytesProcessedDuringAlignment;
 	}
 
+	public List<CheckpointOptions> getTriggeredCheckpointOptions() {
+		return triggeredCheckpointOptions;
+	}
+
 	@Override
 	public void invoke() {
 		throw new UnsupportedOperationException();
@@ -119,6 +124,7 @@ public class ValidatingCheckpointHandler extends AbstractInvokable {
 		lastBytesProcessedDuringAlignment = checkpointMetrics.getBytesProcessedDuringAlignment();
 
 		triggeredCheckpoints.add(checkpointMetaData.getCheckpointId());
+		triggeredCheckpointOptions.add(checkpointOptions);
 	}
 
 	@Override


[flink] 15/18: [FLINK-19681][checkpointing] Switch controller before processing the first barrier

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fdbbfcec892222a3cc57f9b5d01a8cab9472d0d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Dec 3 11:54:29 2020 +0100

    [FLINK-19681][checkpointing] Switch controller before processing the first barrier
    
    If a checkpoint announcement was processed and then UC-barrier arrives
    (from the upstream) then it should be processed by the UC controller.
---
 .../streaming/runtime/io/AlternatingController.java  |  1 +
 .../runtime/io/AlternatingControllerTest.java        | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 7d8761a..e3f816c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -113,6 +113,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 			lastSeenBarrier = barrier.getId();
 			firstBarrierArrivalTime = getArrivalTime(barrier);
 		}
+		activeController = chooseController(barrier);
 		return activeController.preProcessFirstBarrier(channelInfo, barrier);
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index 34485f5..d998649 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -105,6 +105,26 @@ public class AlternatingControllerTest {
 		assertFalse(stateWriter.getAddedInput().isEmpty());
 	}
 
+	/**
+	 * If a checkpoint announcement was processed from one channel and then UC-barrier arrives
+	 * on another channel, this UC barrier should be processed by the UC controller.
+	 */
+	@Test
+	public void testSwitchToUnalignedByUpstream() throws Exception {
+		SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
+		inputGate.setInputChannels(new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1));
+		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+		SingleCheckpointBarrierHandler barrierHandler = barrierHandler(inputGate, target);
+		CheckpointedInputGate gate = buildGate(target, 2);
+
+		CheckpointBarrier aligned = new CheckpointBarrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), Integer.MAX_VALUE));
+
+		send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, gate); // process announcement but not the barrier
+		assertEquals(0, target.triggeredCheckpointCounter);
+		send(toBuffer(aligned.asUnaligned(), true), 1, gate); // pretend it came from upstream before the first (AC) barrier was picked up
+		assertEquals(1, target.triggeredCheckpointCounter);
+	}
+
 	@Test
 	public void testCheckpointHandling() throws Exception {
 		testBarrierHandling(CHECKPOINT);


[flink] 17/18: [hotfix][checkpointing] Explicit creation of CheckpointOptions

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 26ca3f6d8af862f30d71ebcf9d0b22eab149ba85
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Dec 2 14:20:26 2020 +0100

    [hotfix][checkpointing] Explicit creation of CheckpointOptions
    
    The motivation is to eliminate subtle bugs when changing checkpoint
    type on the fly.
    1. Only guess options when creating a new barrier from configuration
    2. For other cases provide explicit factory methods
    2. Carry the current checkpoint/barrier requirements instead of the
    initial configuration.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   2 +-
 .../runtime/checkpoint/CheckpointOptions.java      |  89 +++---
 .../runtime/io/network/api/CheckpointBarrier.java  |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   5 +-
 .../runtime/checkpoint/CheckpointOptionsTest.java  |  53 +---
 .../api/serialization/EventSerializerTest.java     |   8 +-
 .../PipelinedSubpartitionWithReadViewTest.java     |  15 +-
 .../partition/consumer/LocalInputChannelTest.java  |   3 +-
 .../partition/consumer/RemoteInputChannelTest.java |  32 +--
 .../runtime/tasks/SourceOperatorStreamTask.java    |   6 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |   6 +-
 .../runtime/io/AlternatingControllerTest.java      | 303 +++++++++------------
 .../runtime/io/InputProcessorUtilTest.java         |   3 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |   2 +-
 .../io/UnalignedControllerCancellationTest.java    |   2 +-
 .../runtime/io/UnalignedControllerTest.java        |   8 +-
 ...tStreamTaskChainedSourcesCheckpointingTest.java |   2 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   5 +-
 18 files changed, 249 insertions(+), 297 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9154989..27635c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -778,7 +778,7 @@ public class CheckpointCoordinator {
 		Execution[] executions,
 		boolean advanceToEndOfTime) {
 
-		final CheckpointOptions checkpointOptions = CheckpointOptions.create(
+		final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig(
 			props.getCheckpointType(),
 			checkpointStorageLocation.getLocationReference(),
 			isExactlyOnceMode,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 1f12e7f..a788abd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -25,11 +25,13 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import java.io.Serializable;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Options for performing the checkpoint.
+ * Options for performing the checkpoint. Note that different
+ * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier barriers} may have different options.
  *
  * <p>The {@link CheckpointProperties} are related and cover properties that
  * are only relevant at the {@link CheckpointCoordinator}. These options are
@@ -53,19 +55,59 @@ public class CheckpointOptions implements Serializable {
 
 	private final long alignmentTimeout;
 
-	public static CheckpointOptions create(
+	public static CheckpointOptions notExactlyOnce(CheckpointType type, CheckpointStorageLocationReference location) {
+		return new CheckpointOptions(
+			type,
+			location,
+			false,
+			false,
+			NO_ALIGNMENT_TIME_OUT);
+	}
+
+	public static CheckpointOptions alignedNoTimeout(CheckpointType type, CheckpointStorageLocationReference location) {
+		return new CheckpointOptions(
+			type,
+			location,
+			true,
+			false,
+			NO_ALIGNMENT_TIME_OUT);
+	}
+
+	public static CheckpointOptions unaligned(CheckpointStorageLocationReference location) {
+		return new CheckpointOptions(
+			CheckpointType.CHECKPOINT,
+			location,
+			true,
+			true,
+			NO_ALIGNMENT_TIME_OUT);
+	}
+
+	public static CheckpointOptions alignedWithTimeout(CheckpointStorageLocationReference location, long alignmentTimeout) {
+		return new CheckpointOptions(
+			CheckpointType.CHECKPOINT,
+			location,
+			true,
+			false,
+			alignmentTimeout);
+	}
+
+	public static CheckpointOptions forConfig(
 			CheckpointType checkpointType,
 			CheckpointStorageLocationReference locationReference,
 			boolean isExactlyOnceMode,
-			boolean unalignedCheckpointsEnabled,
+			boolean isUnalignedEnabled,
 			long alignmentTimeout) {
-		boolean canBeUnaligned = checkpointType == CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled;
-		return new CheckpointOptions(
-			checkpointType,
-			locationReference,
-			isExactlyOnceMode,
-			canBeUnaligned && alignmentTimeout == 0,
-			canBeUnaligned ? alignmentTimeout : NO_ALIGNMENT_TIME_OUT);
+		if (!isExactlyOnceMode) {
+			return notExactlyOnce(checkpointType, locationReference);
+		} else if (checkpointType.isSavepoint()) {
+			return alignedNoTimeout(checkpointType, locationReference);
+		} else if (!isUnalignedEnabled) {
+			return alignedNoTimeout(checkpointType, locationReference);
+		} else if (alignmentTimeout == 0 || alignmentTimeout == NO_ALIGNMENT_TIME_OUT) {
+			return unaligned(locationReference);
+		} else {
+			return alignedWithTimeout(locationReference, alignmentTimeout);
+		}
 	}
 
 	@VisibleForTesting
@@ -82,6 +124,8 @@ public class CheckpointOptions implements Serializable {
 			boolean isUnalignedCheckpoint,
 			long alignmentTimeout) {
 
+		checkArgument(!isUnalignedCheckpoint || !checkpointType.isSavepoint(), "Savepoint can't be unaligned");
+		checkArgument(alignmentTimeout == NO_ALIGNMENT_TIME_OUT || !isUnalignedCheckpoint, "Unaligned checkpoint can't have timeout (%s)", alignmentTimeout);
 		this.checkpointType = checkNotNull(checkpointType);
 		this.targetLocation = checkNotNull(targetLocation);
 		this.isExactlyOnceMode = isExactlyOnceMode;
@@ -98,7 +142,7 @@ public class CheckpointOptions implements Serializable {
 	}
 
 	public boolean isTimeoutable() {
-		return !isUnalignedCheckpoint && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT);
+		return isExactlyOnceMode && !isUnalignedCheckpoint && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT);
 	}
 
 	// ------------------------------------------------------------------------
@@ -178,25 +222,8 @@ public class CheckpointOptions implements Serializable {
 		return CHECKPOINT_AT_DEFAULT_LOCATION;
 	}
 
-	public static CheckpointOptions forCheckpointWithDefaultLocation(
-			boolean isExactlyOnceMode,
-			boolean isUnalignedCheckpoint,
-			long alignmentTimeout) {
-		return new CheckpointOptions(
-			CheckpointType.CHECKPOINT,
-			CheckpointStorageLocationReference.getDefault(),
-			isExactlyOnceMode,
-			isUnalignedCheckpoint,
-			alignmentTimeout);
-	}
-
-	public CheckpointOptions asTimedOut() {
-		checkState(checkpointType == CheckpointType.CHECKPOINT);
-		return create(
-			checkpointType,
-			targetLocation,
-			isExactlyOnceMode,
-			true,
-			0);
+	public CheckpointOptions toUnaligned() {
+		checkState(!isUnalignedCheckpoint);
+		return unaligned(targetLocation);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index e734616..e7e78e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -119,6 +119,6 @@ public class CheckpointBarrier extends RuntimeEvent {
 	}
 
 	public CheckpointBarrier asUnaligned() {
-		return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut());
+		return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toUnaligned());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 128b74c..d5b0851 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2620,7 +2620,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		return new CheckpointCoordinatorBuilder()
 			.setJobId(jobId)
 			.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
-			.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+			.setCheckpointCoordinatorConfiguration(
+				CheckpointCoordinatorConfiguration.builder()
+					.setAlignmentTimeout(Long.MAX_VALUE)
+					.setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
 			.setTimer(manuallyTriggeredScheduledExecutor)
 			.build();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
index b443e8a..a1485ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import java.util.Random;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static org.junit.Assert.assertArrayEquals;
@@ -63,64 +64,40 @@ public class CheckpointOptionsTest {
 		assertArrayEquals(locationBytes, copy.getTargetLocation().getReferenceBytes());
 	}
 
-	@Test
+	@Test(expected = IllegalArgumentException.class)
 	public void testSavepointNeedsAlignment() {
-		CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault();
-		assertTrue(new CheckpointOptions(SAVEPOINT, location, true, true, 0).needsAlignment());
-		assertFalse(new CheckpointOptions(SAVEPOINT, location, false, true, 0).needsAlignment());
-		assertTrue(new CheckpointOptions(SAVEPOINT, location, true, false, 0).needsAlignment());
-		assertFalse(new CheckpointOptions(SAVEPOINT, location, false, false, 0).needsAlignment());
+		new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault(), true, true, 0);
 	}
 
 	@Test
 	public void testCheckpointNeedsAlignment() {
 		CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault();
-		assertFalse(new CheckpointOptions(CHECKPOINT, location, true, true, 0).needsAlignment());
-		assertTrue(new CheckpointOptions(CHECKPOINT, location, true, false, 0).needsAlignment());
-		assertFalse(new CheckpointOptions(CHECKPOINT, location, false, true, 0).needsAlignment());
-		assertFalse(new CheckpointOptions(CHECKPOINT, location, false, false, 0).needsAlignment());
+		assertFalse(new CheckpointOptions(CHECKPOINT, location, true, true, Long.MAX_VALUE).needsAlignment());
+		assertTrue(new CheckpointOptions(CHECKPOINT, location, true, false, Long.MAX_VALUE).needsAlignment());
+		assertFalse(new CheckpointOptions(CHECKPOINT, location, false, true, Long.MAX_VALUE).needsAlignment());
+		assertFalse(new CheckpointOptions(CHECKPOINT, location, false, false, Long.MAX_VALUE).needsAlignment());
 	}
 
 	@Test
 	public void testCheckpointIsTimeoutable() {
 		CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault();
 		assertTimeoutable(
-			CheckpointOptions.create(
-				CHECKPOINT,
-				location,
-				true,
-				true,
-				10),
+			CheckpointOptions.alignedWithTimeout(location, 10),
 			false,
 			true,
 			10);
 		assertTimeoutable(
-			CheckpointOptions.create(
-				CHECKPOINT,
-				location,
-				true,
-				false,
-				10),
-			false,
-			false,
-			CheckpointOptions.NO_ALIGNMENT_TIME_OUT);
-		assertTimeoutable(
-			CheckpointOptions.create(
-				CHECKPOINT,
-				location,
-				true,
-				true,
-				0),
+			CheckpointOptions.unaligned(location),
 			true,
 			false,
-			0);
+			NO_ALIGNMENT_TIME_OUT);
 	}
 
 	private void assertTimeoutable(CheckpointOptions options, boolean isUnaligned, boolean isTimeoutable, long timeout) {
-		assertTrue(options.isExactlyOnceMode());
-		assertEquals(!isUnaligned, options.needsAlignment());
-		assertEquals(isUnaligned, options.isUnalignedCheckpoint());
-		assertEquals(isTimeoutable, options.isTimeoutable());
-		assertEquals(timeout, options.getAlignmentTimeout());
+		assertTrue("exactly once", options.isExactlyOnceMode());
+		assertEquals("need alignment", !isUnaligned, options.needsAlignment());
+		assertEquals("unaligned", isUnaligned, options.isUnalignedCheckpoint());
+		assertEquals("timeoutable", isTimeoutable, options.isTimeoutable());
+		assertEquals("timeout", timeout, options.getAlignmentTimeout());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ee97672..06c375f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -55,12 +54,7 @@ public class EventSerializerTest {
 		new EventAnnouncement(new CheckpointBarrier(
 			42L,
 			1337L,
-			CheckpointOptions.create(
-				CheckpointType.CHECKPOINT,
-				CheckpointStorageLocationReference.getDefault(),
-				true,
-				true,
-				10)),
+			CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 10)),
 			44)
 	};
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 38cd22f..b2a4c23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -338,12 +337,7 @@ public class PipelinedSubpartitionWithReadViewTest {
 		assertEquals(1, availablityListener.getNumNotifications());
 		assertEquals(0, availablityListener.getNumPriorityEvents());
 
-		CheckpointOptions options = new CheckpointOptions(
-			CheckpointType.CHECKPOINT,
-			new CheckpointStorageLocationReference(new byte[]{0, 1, 2}),
-			true,
-			true,
-			0);
+		CheckpointOptions options = CheckpointOptions.unaligned(new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
 		channelStateWriter.start(0, options);
 		BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true);
 		subpartition.add(barrierBuffer);
@@ -366,12 +360,7 @@ public class PipelinedSubpartitionWithReadViewTest {
 	public void testAvailabilityAfterPriority() throws Exception {
 		subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
 
-		CheckpointOptions options = new CheckpointOptions(
-			CheckpointType.CHECKPOINT,
-			new CheckpointStorageLocationReference(new byte[]{0, 1, 2}),
-			true,
-			true,
-			0);
+		CheckpointOptions options = CheckpointOptions.unaligned(new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
 		BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true);
 		subpartition.add(barrierBuffer);
 		assertEquals(1, availablityListener.getNumNotifications());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8785a68..7b69780 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -501,7 +500,7 @@ public class LocalInputChannelTest {
 		channel.requestSubpartition(0);
 
 		final CheckpointStorageLocationReference location = getDefault();
-		CheckpointOptions options = new CheckpointOptions(CheckpointType.CHECKPOINT, location, true, true, 0);
+		CheckpointOptions options = CheckpointOptions.unaligned(location);
 		stateWriter.start(0, options);
 
 		final CheckpointBarrier barrier = new CheckpointBarrier(0, 123L, options);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 2954a3f..d13cb72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
@@ -83,6 +82,7 @@ import static org.apache.flink.runtime.io.network.partition.AvailabilityUtil.ass
 import static org.apache.flink.runtime.io.network.partition.AvailabilityUtil.assertPriorityAvailability;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -107,7 +107,9 @@ import static org.mockito.Mockito.when;
  */
 public class RemoteInputChannelTest {
 
-	public static final long CHECKPOINT_ID = 1L;
+	private static final long CHECKPOINT_ID = 1L;
+	private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned(getDefault());
+	private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout(getDefault(), 10);
 
 	@Test
 	public void testExceptionOnReordering() throws Exception {
@@ -1136,7 +1138,7 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 0);
+		sendBarrier(channel, sequenceNumber++, UNALIGNED);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 
@@ -1150,7 +1152,7 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 0);
+		sendBarrier(channel, sequenceNumber++, UNALIGNED);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		assertInflightBufferSizes(channel, 1, 2);
@@ -1176,7 +1178,7 @@ public class RemoteInputChannelTest {
 			int sequenceNumber = startingSequence;
 			sendBuffer(channel, sequenceNumber++, bufferSize++);
 			sendBuffer(channel, sequenceNumber++, bufferSize++);
-			sendBarrier(channel, sequenceNumber++, 0);
+			sendBarrier(channel, sequenceNumber++, UNALIGNED);
 			sendBuffer(channel, sequenceNumber++, bufferSize++);
 			sendBuffer(channel, sequenceNumber++, bufferSize++);
 			assertThat(
@@ -1193,7 +1195,7 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 0);
+		sendBarrier(channel, sequenceNumber++, UNALIGNED);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		assertGetNextBufferSequenceNumbers(channel, 2, 0);
@@ -1213,7 +1215,7 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 10);
+		sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 
@@ -1239,7 +1241,7 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 10);
+		sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		assertInflightBufferSizes(channel, 1, 2);
@@ -1252,7 +1254,7 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 10);
+		sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		assertGetNextBufferSequenceNumbers(channel, 2);
@@ -1266,20 +1268,14 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
-		sendBarrier(channel, sequenceNumber++, 10);
+		sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		sendBuffer(channel, sequenceNumber++, bufferSize++);
 		assertGetNextBufferSequenceNumbers(channel, 2, 0);
 		assertInflightBufferSizes(channel, 2);
 	}
 
-	private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, int alignmentTimeout) throws IOException {
-		CheckpointOptions checkpointOptions = CheckpointOptions.create(
-			CHECKPOINT,
-			CheckpointStorageLocationReference.getDefault(),
-			true,
-			true,
-			alignmentTimeout);
+	private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, CheckpointOptions checkpointOptions) throws IOException {
 		send(
 			channel,
 			sequenceNumber,
@@ -1438,7 +1434,7 @@ public class RemoteInputChannelTest {
 		SingleInputGate inputGate = new SingleInputGateBuilder().build();
 		RemoteInputChannel channel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 0);
 
-		CheckpointOptions options = new CheckpointOptions(CHECKPOINT, CheckpointStorageLocationReference.getDefault());
+		CheckpointOptions options = new CheckpointOptions(CHECKPOINT, getDefault());
 		assertPriorityAvailability(inputGate, false, false, () ->
 			assertAvailability(inputGate, false, true, () -> {
 				channel.onBuffer(toBuffer(new CheckpointBarrier(1L, 123L, options), false), 0, 0);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 1f91062..ec20fdf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -23,7 +23,9 @@ import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -131,7 +133,9 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 	// --------------------------
 
 	private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-		final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(
+		final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig(
+			CheckpointType.CHECKPOINT,
+			CheckpointStorageLocationReference.getDefault(),
 			configuration.isExactlyOnceCheckpointMode(),
 			configuration.isUnalignedCheckpointsEnabled(),
 			configuration.getAlignmentTimeout());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index afd67f3..5b10df5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -90,7 +92,9 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 					// TODO - we need to see how to derive those. We should probably not encode this in the
 					// TODO -   source's trigger message, but do a handshake in this task between the trigger
 					// TODO -   message from the master, and the source's trigger notification
-					final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(
+					final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig(
+						CheckpointType.CHECKPOINT,
+						CheckpointStorageLocationReference.getDefault(),
 						configuration.isExactlyOnceCheckpointMode(),
 						configuration.isUnalignedCheckpointsEnabled(),
 						configuration.getAlignmentTimeout());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index d998649..a7be650 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -23,20 +23,19 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
@@ -50,15 +49,18 @@ import java.util.Optional;
 
 import static java.util.Collections.singletonList;
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout;
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedWithTimeout;
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -146,7 +148,7 @@ public class AlternatingControllerTest {
 			barriers.add(barrier);
 			CheckpointType type = barrier % 2 == 0 ? CHECKPOINT : SAVEPOINT;
 			for (int channel = 0; channel < numChannels; channel++) {
-				sendBarrier(barrier, type, (TestInputChannel) gate.getChannel(channel), gate);
+				send(barrier(barrier, System.currentTimeMillis(), alignedNoTimeout(type, getDefault())).retainBuffer(), channel, gate);
 			}
 		}
 		assertEquals(barriers, target.triggeredCheckpoints);
@@ -155,28 +157,22 @@ public class AlternatingControllerTest {
 	@Test
 	public void testAlignedNeverTimeoutableCheckpoint() throws Exception {
 		int numChannels = 2;
-		int bufferSize = 1000;
 		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
 		CheckpointedInputGate gate = buildGate(target, numChannels);
 
-		long checkpointCreationTime = System.currentTimeMillis();
-		// Aligned checkpoint that never times out
-		Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE);
-		send(neverTimeoutableCheckpoint, gate, 0);
-		sendBuffer(bufferSize, gate, 1);
+		Buffer neverTimeoutableCheckpoint = withTimeout(Integer.MAX_VALUE);
+		send(neverTimeoutableCheckpoint, 0, gate);
+		sendData(1000, 1, gate);
 		assertEquals(0, target.getTriggeredCheckpointCounter());
 
-		send(neverTimeoutableCheckpoint, gate, 1);
+		send(neverTimeoutableCheckpoint, 1, gate);
 		assertEquals(1, target.getTriggeredCheckpointCounter());
 	}
 
 	@Test
 	public void testTimeoutAlignment() throws Exception {
-		int numChannels = 2;
 		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
-		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels);
-
-		testTimeoutBarrierOnTwoChannels(target, gate);
+		testTimeoutBarrierOnTwoChannels(target, buildRemoteInputGate(target, 2));
 	}
 
 	@Test
@@ -185,15 +181,7 @@ public class AlternatingControllerTest {
 		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
 		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels);
 
-		long alignmentTimeout = 10;
-		long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout;
-		Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE);
-
-		RemoteInputChannel channel2 = (RemoteInputChannel) gate.getChannel(2);
-
-		channel2.onBuffer(neverTimeoutableCheckpoint.retainBuffer(), 0, 0);
-		while (gate.pollNext().isPresent()) {
-		}
+		send(barrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), 2, gate);
 
 		assertEquals(0, target.getTriggeredCheckpointCounter());
 
@@ -201,46 +189,31 @@ public class AlternatingControllerTest {
 	}
 
 	private void testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, CheckpointedInputGate gate) throws Exception {
-		int bufferSize = 1000;
 		long alignmentTimeout = 10;
-		long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout;
-		Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout);
-		Buffer buffer = TestBufferFactory.createBuffer(bufferSize);
-
-		RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0);
-		RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1);
-		channel0.onBuffer(buffer.retainBuffer(), 0, 0);
-		channel0.onBuffer(buffer.retainBuffer(), 1, 0);
-		channel0.onBuffer(checkpointBarrier.retainBuffer(), 2, 0);
-		channel1.onBuffer(buffer.retainBuffer(), 0, 0);
-		channel1.onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
+		Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+
+		getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
+		getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
+		getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 2, 0);
+		getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
+		getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
 
 		assertEquals(0, target.getTriggeredCheckpointCounter());
-		// First announcements and prioritsed barriers
-		List<AbstractEvent> events = new ArrayList<>();
-		events.add(gate.pollNext().get().getEvent());
+		assertAnnouncement(gate);
 		Thread.sleep(alignmentTimeout * 2);
-		events.add(gate.pollNext().get().getEvent());
-		events.add(gate.pollNext().get().getEvent());
-		events.add(gate.pollNext().get().getEvent());
-		assertThat(events, containsInAnyOrder(
-			instanceOf(EventAnnouncement.class),
-			instanceOf(EventAnnouncement.class),
-			instanceOf(CheckpointBarrier.class),
-			instanceOf(CheckpointBarrier.class)));
+		assertAnnouncement(gate);
+		assertBarrier(gate);
+		assertBarrier(gate);
 		assertEquals(1, target.getTriggeredCheckpointCounter());
-		assertThat(
-			target.getTriggeredCheckpointOptions(),
-			contains(CheckpointOptions.create(
-				CHECKPOINT,
-				CheckpointStorageLocationReference.getDefault(),
-				true,
-				true,
-				0)));
+		assertThat(target.getTriggeredCheckpointOptions(), contains(unaligned(getDefault())));
 		// Followed by overtaken buffers
-		assertFalse(gate.pollNext().get().isEvent());
-		assertFalse(gate.pollNext().get().isEvent());
-		assertFalse(gate.pollNext().get().isEvent());
+		assertData(gate);
+		assertData(gate);
+		assertData(gate);
+	}
+
+	private Buffer dataBuffer() {
+		return createBuffer(100).retainBuffer();
 	}
 
 	/**
@@ -254,27 +227,21 @@ public class AlternatingControllerTest {
 		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels);
 
 		long alignmentTimeout = 100;
-		long checkpointCreationTime = System.currentTimeMillis();
-		Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout);
+		Buffer checkpointBarrier = withTimeout(alignmentTimeout);
 
-		RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0);
-		RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1);
-		channel0.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
-		channel1.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+		for (int i = 0; i < numChannels; i++) {
+			(getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+		}
 
 		assertEquals(0, target.getTriggeredCheckpointCounter());
-		// First announcements and prioritsed barriers
-		List<AbstractEvent> events = new ArrayList<>();
-		events.add(gate.pollNext().get().getEvent());
-		events.add(gate.pollNext().get().getEvent());
+		for (int i = 0; i < numChannels; i++) {
+			assertAnnouncement(gate);
+		}
+		assertEquals(0, target.getTriggeredCheckpointCounter());
 
-		Thread.sleep(alignmentTimeout * 2);
+		Thread.sleep(alignmentTimeout * 4);
 
-		events.add(gate.pollNext().get().getEvent());
-		assertThat(events, contains(
-			instanceOf(EventAnnouncement.class),
-			instanceOf(EventAnnouncement.class),
-			instanceOf(CheckpointBarrier.class)));
+		assertBarrier(gate);
 		assertEquals(1, target.getTriggeredCheckpointCounter());
 	}
 
@@ -284,43 +251,29 @@ public class AlternatingControllerTest {
 	 */
 	@Test
 	public void testTimeoutAlignmentOnUnalignedCheckpoint() throws Exception {
-		int numChannels = 2;
 		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
 		RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
-		CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels, channelStateWriter);
-
-		long checkpointCreationTime = System.currentTimeMillis();
-		Buffer alignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, Integer.MAX_VALUE);
-		Buffer unalignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, 0);
-		Buffer buffer = TestBufferFactory.createBuffer(1000);
+		CheckpointedInputGate gate = buildRemoteInputGate(target, 2, channelStateWriter);
 
-		RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0);
-		RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1);
-		channel0.onBuffer(alignedCheckpointBarrier.retainBuffer(), 0, 0);
+		getChannel(gate, 0).onBuffer(withTimeout(Integer.MAX_VALUE).retainBuffer(), 0, 0);
 
-		List<AbstractEvent> events = new ArrayList<>();
-		events.add(gate.pollNext().get().getEvent());
-		events.add(gate.pollNext().get().getEvent());
+		assertAnnouncement(gate);
+		assertBarrier(gate);
 
-		assertThat(events, contains(
-			instanceOf(EventAnnouncement.class),
-			instanceOf(CheckpointBarrier.class)));
+		getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
+		getChannel(gate, 1).onBuffer(dataBuffer(), 1, 0);
+		getChannel(gate, 1).onBuffer(toBuffer(new CheckpointBarrier(1, System.currentTimeMillis(), unaligned(getDefault())), true).retainBuffer(), 2, 0);
 
-		channel1.onBuffer(buffer.retainBuffer(), 0, 0);
-		channel1.onBuffer(buffer.retainBuffer(), 1, 0);
-		channel1.onBuffer(unalignedCheckpointBarrier.retainBuffer(), 2, 0);
+		assertBarrier(gate);
 
-		events.add(gate.pollNext().get().getEvent());
-
-		assertThat(events, contains(
-			instanceOf(EventAnnouncement.class),
-			instanceOf(CheckpointBarrier.class),
-			instanceOf(CheckpointBarrier.class)));
-
-		assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()).size(), 2);
+		assertEquals(channelStateWriter.getAddedInput().get(getChannel(gate, 1).getChannelInfo()).size(), 2);
 		assertEquals(1, target.getTriggeredCheckpointCounter());
 	}
 
+	private RemoteInputChannel getChannel(CheckpointedInputGate gate, int channelIndex) {
+		return (RemoteInputChannel) gate.getChannel(channelIndex);
+	}
+
 	@Test
 	public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws Exception {
 		testTimeoutAlignmentConsistency(true, false, false);
@@ -352,9 +305,8 @@ public class AlternatingControllerTest {
 				new TestSubtaskCheckpointCoordinator(channelStateWriter),
 				gate));
 
-		long checkpointCreationTime = System.currentTimeMillis();
 		long alignmentTimeout = 10;
-		CheckpointBarrier barrier = checkpointBarrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout);
+		CheckpointBarrier barrier = new CheckpointBarrier(1, System.currentTimeMillis(), alignedNoTimeout(CHECKPOINT, getDefault()));
 
 		InputChannelInfo channelInfo = channel0.getChannelInfo();
 
@@ -417,12 +369,12 @@ public class AlternatingControllerTest {
 		long startNanos = System.nanoTime();
 		long checkpoint1CreationTime = System.currentTimeMillis() - 10;
 		sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0);
-		sendBuffer(bufferSize, gate, 0);
-		sendBuffer(bufferSize, gate, 1);
+		sendData(bufferSize, 0, gate);
+		sendData(bufferSize, 1, gate);
 
 		Thread.sleep(6);
 		sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 1);
-		sendBuffer(bufferSize, gate, 0);
+		sendData(bufferSize, 0, gate);
 
 		assertMetrics(
 			target,
@@ -436,7 +388,7 @@ public class AlternatingControllerTest {
 		startNanos = System.nanoTime();
 		long checkpoint2CreationTime = System.currentTimeMillis() - 5;
 		sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0);
-		sendBuffer(bufferSize, gate, 1);
+		sendData(bufferSize, 1, gate);
 
 		assertMetrics(
 			target,
@@ -448,7 +400,7 @@ public class AlternatingControllerTest {
 			bufferSize * 2);
 		Thread.sleep(5);
 		sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 1);
-		sendBuffer(bufferSize, gate, 0);
+		sendData(bufferSize, 0, gate);
 
 		assertMetrics(
 			target,
@@ -461,9 +413,9 @@ public class AlternatingControllerTest {
 
 		startNanos = System.nanoTime();
 		long checkpoint3CreationTime = System.currentTimeMillis() - 7;
-		sendBarrier(3, checkpoint3CreationTime, CHECKPOINT, gate, 0);
-		sendBuffer(bufferSize, gate, 0);
-		sendBuffer(bufferSize, gate, 1);
+		send(barrier(3, checkpoint3CreationTime, unaligned(getDefault())), 0, gate);
+		sendData(bufferSize, 0, gate);
+		sendData(bufferSize, 1, gate);
 		assertMetrics(
 			target,
 			gate.getCheckpointBarrierHandler(),
@@ -473,7 +425,7 @@ public class AlternatingControllerTest {
 			7_000_000L,
 			-1L);
 		Thread.sleep(10);
-		sendBarrier(3, checkpoint2CreationTime, CHECKPOINT, gate, 1);
+		send(barrier(3, checkpoint2CreationTime, unaligned(getDefault())), 1, gate);
 		assertMetrics(
 			target,
 			gate.getCheckpointBarrierHandler(),
@@ -487,16 +439,15 @@ public class AlternatingControllerTest {
 	@Test
 	public void testMetricsSingleChannel() throws Exception {
 		int numChannels = 1;
-		int bufferSize = 1000;
 		ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
 		CheckpointedInputGate gate = buildGate(target, numChannels);
 
 		long checkpoint1CreationTime = System.currentTimeMillis() - 10;
 		long startNanos = System.nanoTime();
 
-		sendBuffer(bufferSize, gate, 0);
+		sendData(1000, 0, gate);
 		sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0);
-		sendBuffer(bufferSize, gate, 0);
+		sendData(1000, 0, gate);
 		Thread.sleep(6);
 		assertMetrics(
 			target,
@@ -509,9 +460,9 @@ public class AlternatingControllerTest {
 
 		long checkpoint2CreationTime = System.currentTimeMillis() - 5;
 		startNanos = System.nanoTime();
-		sendBuffer(bufferSize, gate, 0);
+		sendData(1000, 0, gate);
 		sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0);
-		sendBuffer(bufferSize, gate, 0);
+		sendData(1000, 0, gate);
 		Thread.sleep(5);
 		assertMetrics(
 			target,
@@ -561,7 +512,7 @@ public class AlternatingControllerTest {
 			if (type.isSavepoint()) {
 				channels[channel].setBlocked(true);
 			}
-			barrierHandler.processBarrier(new CheckpointBarrier(i, System.currentTimeMillis(), new CheckpointOptions(type, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, channel));
+			barrierHandler.processBarrier(new CheckpointBarrier(i, System.currentTimeMillis(), new CheckpointOptions(type, getDefault())), new InputChannelInfo(0, channel));
 			if (type.isSavepoint()) {
 				assertTrue(channels[channel].isBlocked());
 				assertFalse(channels[(channel + 1) % 2].isBlocked());
@@ -587,7 +538,7 @@ public class AlternatingControllerTest {
 		SingleCheckpointBarrierHandler barrierHandler = barrierHandler(inputGate, target);
 
 		final long id = 1;
-		barrierHandler.processBarrier(new CheckpointBarrier(id, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
+		barrierHandler.processBarrier(new CheckpointBarrier(id, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, getDefault())), new InputChannelInfo(0, 0));
 
 		assertFalse(barrierHandler.getAllBarriersReceivedFuture(id).isDone());
 	}
@@ -604,9 +555,9 @@ public class AlternatingControllerTest {
 		long checkpointId = 10;
 		long outOfOrderSavepointId = 5;
 
-		barrierHandler.processBarrier(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
+		barrierHandler.processBarrier(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, getDefault())), new InputChannelInfo(0, 0));
 		secondChannel.setBlocked(true);
-		barrierHandler.processBarrier(new CheckpointBarrier(outOfOrderSavepointId, System.currentTimeMillis(), new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1));
+		barrierHandler.processBarrier(new CheckpointBarrier(outOfOrderSavepointId, System.currentTimeMillis(), new CheckpointOptions(SAVEPOINT, getDefault())), new InputChannelInfo(0, 1));
 
 		assertEquals(checkpointId, barrierHandler.getLatestCheckpointId());
 		assertFalse(secondChannel.isBlocked());
@@ -627,9 +578,11 @@ public class AlternatingControllerTest {
 			slow.setBlocked(true);
 		}
 
-		sendBarrier(barrierId, checkpointType, fast, checkpointedGate);
+		CheckpointOptions options = checkpointType.isSavepoint() ? alignedNoTimeout(checkpointType, getDefault()) : unaligned(getDefault());
+		Buffer barrier = barrier(barrierId, 1, options);
+		send(barrier.retainBuffer(), fast, checkpointedGate);
 		assertEquals(checkpointType.isSavepoint(), target.triggeredCheckpoints.isEmpty());
-		sendBarrier(barrierId, checkpointType, slow, checkpointedGate);
+		send(barrier.retainBuffer(), slow, checkpointedGate);
 
 		assertEquals(singletonList(barrierId), target.triggeredCheckpoints);
 		if (checkpointType.isSavepoint()) {
@@ -642,29 +595,43 @@ public class AlternatingControllerTest {
 	}
 
 	private void sendBarrier(long barrierId, long barrierCreationTime, CheckpointType type, CheckpointedInputGate gate, int channelId) throws Exception {
-		send(barrier(barrierId, type, barrierCreationTime), gate, channelId);
+		send(barrier(barrierId, barrierCreationTime, alignedNoTimeout(type, getDefault())), channelId, gate);
 	}
 
-	private void send(Buffer buffer, CheckpointedInputGate gate, int channelId) throws Exception {
-		TestInputChannel channel = (TestInputChannel) gate.getChannel(channelId);
-		channel.read(buffer.retainBuffer());
-		while (gate.pollNext().isPresent()) {
-		}
+	private void sendData(int dataSize, int channelId, CheckpointedInputGate gate) throws Exception {
+		send(createBuffer(dataSize), channelId, gate);
 	}
 
-	private void sendBarrier(long barrierId, CheckpointType type, TestInputChannel channel, CheckpointedInputGate gate) throws Exception {
-		channel.read(barrier(barrierId, type).retainBuffer());
-		while (gate.pollNext().isPresent()) {
-		}
+	private void send(Buffer buffer, int channelId, CheckpointedInputGate gate) throws Exception {
+		send(buffer.retainBuffer(), gate.getChannel(channelId), gate);
 	}
 
-	private void sendBuffer(int bufferSize, CheckpointedInputGate gate, int channelId) throws Exception {
-		TestInputChannel channel = (TestInputChannel) gate.getChannel(channelId);
-		channel.read(TestBufferFactory.createBuffer(bufferSize));
-		while (gate.pollNext().isPresent()) {
+	private void send(Buffer buffer, InputChannel channel, CheckpointedInputGate checkpointedGate) throws IOException, InterruptedException {
+		if (channel instanceof TestInputChannel) {
+			((TestInputChannel) channel).read(buffer);
+		} else if (channel instanceof RemoteInputChannel) {
+			((RemoteInputChannel) channel).onBuffer(buffer, 0, 0);
+		} else {
+			throw new IllegalArgumentException("Unknown channel type: " + channel);
+		}
+		while (checkpointedGate.pollNext().isPresent()) {
 		}
 	}
 
+	private Buffer withTimeout(long alignmentTimeout) throws IOException {
+		return barrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), alignmentTimeout));
+	}
+
+	private Buffer barrier(long barrierId, long barrierTimestamp, CheckpointOptions options) throws IOException {
+		CheckpointBarrier checkpointBarrier = new CheckpointBarrier(
+			barrierId,
+			barrierTimestamp,
+			options);
+		return toBuffer(
+			checkpointBarrier,
+			checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
+	}
+
 	private static SingleCheckpointBarrierHandler barrierHandler(SingleInputGate inputGate, AbstractInvokable target) {
 		return barrierHandler(inputGate, target, new RecordingChannelStateWriter());
 	}
@@ -683,38 +650,6 @@ public class AlternatingControllerTest {
 				new UnalignedController(new TestSubtaskCheckpointCoordinator(stateWriter), inputGate)));
 	}
 
-	private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException {
-		return barrier(barrierId, checkpointType, System.currentTimeMillis());
-	}
-
-	private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp) throws IOException {
-		return barrier(barrierId, checkpointType, barrierTimestamp, 0);
-	}
-
-	private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) throws IOException {
-		CheckpointBarrier checkpointBarrier = checkpointBarrier(
-			barrierId,
-			checkpointType,
-			barrierTimestamp,
-			alignmentTimeout);
-		return toBuffer(
-			checkpointBarrier,
-			checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
-	}
-
-	private CheckpointBarrier checkpointBarrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) {
-		CheckpointOptions options = CheckpointOptions.create(
-			checkpointType,
-			CheckpointStorageLocationReference.getDefault(),
-			true,
-			true,
-			alignmentTimeout);
-		return new CheckpointBarrier(
-			barrierId,
-			barrierTimestamp,
-			options);
-	}
-
 	private static CheckpointedInputGate buildGate(AbstractInvokable target, int numChannels) {
 		SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numChannels).build();
 		TestInputChannel[] channels = new TestInputChannel[numChannels];
@@ -751,4 +686,30 @@ public class AlternatingControllerTest {
 		MailboxProcessor mailboxProcessor = new MailboxProcessor();
 		return new CheckpointedInputGate(gate, barrierHandler(gate, target, channelStateWriter), mailboxProcessor.getMainMailboxExecutor());
 	}
+
+	private static void assertAnnouncement(CheckpointedInputGate gate) throws IOException, InterruptedException {
+		assertEvent(gate, EventAnnouncement.class);
+	}
+
+	private static void assertBarrier(CheckpointedInputGate gate) throws IOException, InterruptedException {
+		assertEvent(gate, CheckpointBarrier.class);
+	}
+
+	private static <T extends RuntimeEvent> void assertEvent(CheckpointedInputGate gate, Class<T> clazz) throws IOException, InterruptedException {
+		Optional<BufferOrEvent> bufferOrEvent = assertPoll(gate);
+		assertTrue("expected event, got data buffer on " + bufferOrEvent.get().getChannelInfo(), bufferOrEvent.get().isEvent());
+		assertEquals(clazz, bufferOrEvent.get().getEvent().getClass());
+	}
+
+	private static <T extends RuntimeEvent> void assertData(CheckpointedInputGate gate) throws IOException, InterruptedException {
+		Optional<BufferOrEvent> bufferOrEvent = assertPoll(gate);
+		assertTrue("expected data, got " + bufferOrEvent.get().getEvent() + "  on " + bufferOrEvent.get().getChannelInfo(), bufferOrEvent.get().isBuffer());
+	}
+
+	private static Optional<BufferOrEvent> assertPoll(CheckpointedInputGate gate) throws IOException, InterruptedException {
+		Optional<BufferOrEvent> bufferOrEvent = gate.pollNext();
+		assertTrue("empty gate", bufferOrEvent.isPresent());
+		return bufferOrEvent;
+	}
+
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
index 6f449ef..7d401d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
@@ -84,7 +85,7 @@ public class InputProcessorUtilTest {
 			for (IndexedInputGate inputGate : allInputGates) {
 				for (int channelId = 0; channelId < inputGate.getNumberOfInputChannels(); channelId++) {
 					barrierHandler.processBarrier(
-						new CheckpointBarrier(1, 42, CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0)),
+						new CheckpointBarrier(1, 42, CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())),
 						new InputChannelInfo(inputGate.getGateIndex(), channelId));
 				}
 			}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index e916329..319c4f9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest {
 			deserializers);
 
 		inputGate.sendEvent(
-			new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()),
+			new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned()),
 			channelId);
 		inputGate.sendElement(new StreamRecord<>(42L), channelId);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
index 82ac3fe..1291457 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
@@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest {
 	}
 
 	private static CheckpointBarrier checkpoint(int checkpointId) {
-		return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut());
+		return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned());
 	}
 
 	private static CancelCheckpointMarker cancel(int checkpointId) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
index 7604746..7b64ee7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -39,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
@@ -59,8 +57,6 @@ import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -620,7 +616,7 @@ public class UnalignedControllerTest {
 			new CheckpointBarrier(
 				checkpointId,
 				timestamp,
-				new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT)),
+				CheckpointOptions.unaligned(getDefault())),
 			new InputChannelInfo(0, channel));
 	}
 
@@ -734,7 +730,7 @@ public class UnalignedControllerTest {
 	}
 
 	private CheckpointBarrier buildCheckpointBarrier(long id) {
-		return new CheckpointBarrier(id, 0, new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT));
+		return new CheckpointBarrier(id, 0, CheckpointOptions.unaligned(getDefault()));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
index 5f912ab..d97bca2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
@@ -221,7 +221,7 @@ public class MultipleInputStreamTaskChainedSourcesCheckpointingTest {
 
 	private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> testHarness) {
 		StreamConfig config = testHarness.getStreamTask().getConfiguration();
-		CheckpointOptions checkpointOptions = CheckpointOptions.create(
+		CheckpointOptions checkpointOptions = CheckpointOptions.forConfig(
 				CheckpointType.CHECKPOINT,
 				CheckpointStorageLocationReference.getDefault(),
 				config.isExactlyOnceCheckpointMode(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 304e448..b83e2de 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -104,8 +104,9 @@ public class SubtaskCheckpointCoordinatorTest {
 		MockWriter writer = new MockWriter();
 		SubtaskCheckpointCoordinator coordinator = coordinator(unalignedCheckpointEnabled, writer);
 		CheckpointStorageLocationReference locationReference = CheckpointStorageLocationReference.getDefault();
-		CheckpointOptions options = new CheckpointOptions(checkpointType, locationReference, true, unalignedCheckpointEnabled, 0);
-		coordinator.initCheckpoint(1L, options);
+		coordinator.initCheckpoint(1L, unalignedCheckpointEnabled ?
+			CheckpointOptions.unaligned(locationReference) :
+			CheckpointOptions.alignedNoTimeout(checkpointType, locationReference));
 		return writer.started;
 	}
 


[flink] 01/18: [hotfix][test] Improve error message in ValidatingCheckpointHandler

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4ba4df5824aecfb254591db4958842ae68d707b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Oct 28 14:11:26 2020 +0100

    [hotfix][test] Improve error message in ValidatingCheckpointHandler
---
 .../flink/streaming/runtime/io/ValidatingCheckpointHandler.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
index 7f4d4fe..f426289 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -106,8 +107,9 @@ public class ValidatingCheckpointHandler extends AbstractInvokable {
 			CheckpointMetaData checkpointMetaData,
 			CheckpointOptions checkpointOptions,
 			CheckpointMetricsBuilder checkpointMetrics) {
-		assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L ||
-			nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
+		if (nextExpectedCheckpointId != -1L) {
+			assertEquals(nextExpectedCheckpointId, checkpointMetaData.getCheckpointId());
+		}
 		assertTrue(checkpointMetaData.getTimestamp() > 0);
 
 		nextExpectedCheckpointId = checkpointMetaData.getCheckpointId() + 1;


[flink] 07/18: [hotfix][network] Report channel index if failied to deserialize

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6710045013ea8236cee06f48bc0c73a029f93bfe
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Nov 9 14:07:43 2020 +0100

    [hotfix][network] Report channel index if failied to deserialize
---
 .../apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 5084b8f..850d2ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -142,7 +142,12 @@ public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> {
 		while (true) {
 			// get the stream element from the deserializer
 			if (currentRecordDeserializer != null) {
-				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
+				DeserializationResult result;
+				try {
+					result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
+				} catch (IOException e) {
+					throw new IOException(String.format("Can't get next record for channel %d", lastChannel), e);
+				}
 				if (result.isBufferConsumed()) {
 					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
 					currentRecordDeserializer = null;


[flink] 03/18: [FLINK-19681][checkpointing] Choose controler before processing first barrier or announcement

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d446d2a768d66e01ac6e473e953e95b2f4ac1b0
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Nov 3 18:30:23 2020 +0100

    [FLINK-19681][checkpointing] Choose controler before processing first barrier or announcement
---
 .../streaming/runtime/io/AlignedController.java    |  4 ++
 .../runtime/io/AlternatingController.java          |  7 +++-
 .../io/CheckpointBarrierBehaviourController.java   |  5 +++
 .../runtime/io/SingleCheckpointBarrierHandler.java | 46 ++++++++++++----------
 .../streaming/runtime/io/UnalignedController.java  |  4 ++
 5 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index 8f0e409..f3999a4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -49,6 +49,10 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 	}
 
 	@Override
+	public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) {
+	}
+
+	@Override
 	public void barrierReceived(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index eb302ce..d040c66 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -44,6 +44,11 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	}
 
 	@Override
+	public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) {
+		activeController = chooseController(barrier);
+	}
+
+	@Override
 	public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
 		checkActiveController(barrier);
 		activeController.barrierReceived(channelInfo, barrier);
@@ -53,7 +58,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	public boolean preProcessFirstBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException, CheckpointException {
-		activeController = chooseController(barrier);
+		checkActiveController(barrier);
 		return activeController.preProcessFirstBarrier(channelInfo, barrier);
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
index f38d745..5939451 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
@@ -32,6 +32,11 @@ import java.io.IOException;
 public interface CheckpointBarrierBehaviourController {
 
 	/**
+	 * Invoked before first {@link CheckpointBarrier} or it's announcement.
+	 */
+	void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier);
+
+	/**
 	 * Invoked per every received {@link CheckpointBarrier}.
 	 */
 	void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
index ef8eb45..fa1ee00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
@@ -66,6 +66,8 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 	 */
 	private long currentCheckpointId = -1L;
 
+	private long lastCancelledOrCompletedCheckpointId = -1L;
+
 	private int numOpenChannels;
 
 	private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
@@ -105,18 +107,14 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 			return;
 		}
 
-		if (currentCheckpointId < barrierId) {
-			if (isCheckpointPending()) {
-				cancelSubsumedCheckpoint(barrierId);
-			}
+		checkSubsumedCheckpoint(channelInfo, barrier);
 
+		if (numBarriersReceived == 0) {
 			if (getNumOpenChannels() == 1) {
 				markAlignmentStartAndEnd(barrier.getTimestamp());
 			} else {
 				markAlignmentStart(barrier.getTimestamp());
 			}
-			currentCheckpointId = barrierId;
-			numBarriersReceived = 0;
 			allBarriersReceivedFuture = new CompletableFuture<>();
 			try {
 				if (controller.preProcessFirstBarrier(channelInfo, barrier)) {
@@ -140,6 +138,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 					markAlignmentEnd();
 				}
 				numBarriersReceived = 0;
+				lastCancelledOrCompletedCheckpointId = currentCheckpointId;
 				if (controller.postProcessLastBarrier(channelInfo, barrier)) {
 					LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.",
 						taskName,
@@ -157,9 +156,22 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 			CheckpointBarrier announcedBarrier,
 			int sequenceNumber,
 			InputChannelInfo channelInfo) throws IOException {
+		checkSubsumedCheckpoint(channelInfo, announcedBarrier);
 		// TODO: FLINK-19681
 	}
 
+	private void checkSubsumedCheckpoint(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException {
+		long barrierId = barrier.getId();
+		if (currentCheckpointId < barrierId) {
+			if (isCheckpointPending()) {
+				cancelSubsumedCheckpoint(currentCheckpointId);
+			}
+			currentCheckpointId = barrierId;
+			numBarriersReceived = 0;
+			controller.preProcessFirstBarrierOrAnnouncement(barrier);
+		}
+	}
+
 	@Override
 	public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException {
 		final long cancelledId = cancelBarrier.getCheckpointId();
@@ -168,10 +180,15 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 		}
 	}
 
+	private void abortInternal(long cancelledId, CheckpointFailureReason reason) throws IOException {
+		abortInternal(cancelledId, new CheckpointException(reason));
+	}
+
 	private void abortInternal(long cancelledId, CheckpointException exception) throws IOException {
 		// by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
 		// at zero means that no checkpoint barrier can start a new alignment
 		currentCheckpointId = Math.max(cancelledId, currentCheckpointId);
+		lastCancelledOrCompletedCheckpointId = Math.max(lastCancelledOrCompletedCheckpointId, cancelledId);
 		numBarriersReceived = 0;
 		controller.abortPendingCheckpoint(cancelledId, exception);
 		allBarriersReceivedFuture.completeExceptionally(exception);
@@ -187,11 +204,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 				"{}: Received EndOfPartition(-1) before completing current checkpoint {}. Skipping current checkpoint.",
 				taskName,
 				currentCheckpointId);
-			numBarriersReceived = 0;
-			CheckpointException exception = new CheckpointException(CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
-			controller.abortPendingCheckpoint(currentCheckpointId, exception);
-			allBarriersReceivedFuture.completeExceptionally(exception);
-			notifyAbort(currentCheckpointId, exception);
+			abortInternal(currentCheckpointId, CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
 		}
 	}
 
@@ -208,23 +221,16 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 
 	@Override
 	protected boolean isCheckpointPending() {
-		return numBarriersReceived > 0;
+		return currentCheckpointId != lastCancelledOrCompletedCheckpointId && currentCheckpointId >= 0;
 	}
 
 	private void cancelSubsumedCheckpoint(long barrierId) throws IOException {
-		CheckpointException exception = new CheckpointException("Barrier id: " + barrierId,
-			CHECKPOINT_DECLINED_SUBSUMED);
-		// we did not complete the current checkpoint, another started before
 		LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 				"Skipping current checkpoint.",
 			taskName,
 			barrierId,
 			currentCheckpointId);
-
-		// let the task know we are not completing this
-		controller.abortPendingCheckpoint(currentCheckpointId, exception);
-		allBarriersReceivedFuture.completeExceptionally(exception);
-		notifyAbort(currentCheckpointId, exception);
+		abortInternal(currentCheckpointId, CHECKPOINT_DECLINED_SUBSUMED);
 	}
 
 	public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
index d53b4ec..3d5f575 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
@@ -44,6 +44,10 @@ public class UnalignedController implements CheckpointBarrierBehaviourController
 	}
 
 	@Override
+	public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) {
+	}
+
+	@Override
 	public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
 	}
 


[flink] 06/18: [FLINK-19681][config][checkpointing] Un-hide alignment timeout option

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28a80142989b2be2726b7392074714baf5650462
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Dec 2 14:17:01 2020 +0100

    [FLINK-19681][config][checkpointing] Un-hide alignment timeout option
---
 docs/_includes/generated/execution_checkpointing_configuration.html | 6 ++++++
 .../streaming/api/environment/ExecutionCheckpointingOptions.java    | 1 -
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/docs/_includes/generated/execution_checkpointing_configuration.html b/docs/_includes/generated/execution_checkpointing_configuration.html
index 94219b1..a3325e4 100644
--- a/docs/_includes/generated/execution_checkpointing_configuration.html
+++ b/docs/_includes/generated/execution_checkpointing_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>execution.checkpointing.alignment-timeout</h5></td>
+            <td style="word-wrap: break-word;">0 ms</td>
+            <td>Duration</td>
+            <td>Only relevant if <span markdown="span">`execution.checkpointing.unaligned`</span> is enabled.<br /><br />If timeout is 0, checkpoints will always start unaligned.<br /><br />If timeout has a positive value, checkpoints will start aligned. If during checkpointing, checkpoint start delay exceeds this timeout, alignment will timeout and checkpoint barrier will start working as unaligned checkpoint.</td>
+        </tr>
+        <tr>
             <td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td><p>Enum</p>Possible values: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]</td>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 7ba5cb5..32530fd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -145,7 +145,6 @@ public class ExecutionCheckpointingOptions {
 					TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key()))
 				.build());
 
-	@Documentation.ExcludeFromDocumentation("Do not advertise this option until timeout in unaligned checkpoint is completed.")
 	public static final ConfigOption<Duration> ALIGNMENT_TIMEOUT =
 		ConfigOptions.key("execution.checkpointing.alignment-timeout")
 			.durationType()


[flink] 02/18: [hotfix][test] Fix StreamConfig propagation to StreamTask in StreamTaskMailboxTestHarnessBuilder

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37e0fb44162b4c777ae47173d61babd3fb4ae7fe
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Oct 28 18:20:45 2020 +0100

    [hotfix][test] Fix StreamConfig propagation to StreamTask in StreamTaskMailboxTestHarnessBuilder
---
 .../streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index dc69ed4..25bec13 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -74,8 +74,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 	protected long memorySize = 1024 * 1024;
 	protected int bufferSize = 1024;
 	protected Configuration jobConfig = new Configuration();
-	protected Configuration taskConfig = new Configuration();
-	protected StreamConfig streamConfig = new StreamConfig(taskConfig);
+	protected StreamConfig streamConfig = new StreamConfig(new Configuration());
 	protected LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
 	@Nullable
 	protected StreamTestSingleInputGate[] inputGates;
@@ -138,7 +137,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 
 		StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(
 			jobConfig,
-			taskConfig,
+			streamConfig.getConfiguration(),
 			executionConfig,
 			memorySize,
 			new MockInputSplitProvider(),


[flink] 05/18: [FLINK-19681][tests] Adjust alignmentTimeout in unaligned checkpoint ITCases

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0b9089f1172815b9c711f04a28ca4657acd0357
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Oct 29 17:51:49 2020 +0100

    [FLINK-19681][tests] Adjust alignmentTimeout in unaligned checkpoint ITCases
---
 .../flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java | 1 +
 .../org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java | 1 +
 2 files changed, 2 insertions(+)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
index 6d69576..2e054a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
@@ -186,6 +186,7 @@ public class UnalignedCheckpointCompatibilityITCase extends TestLogger {
 		env.setParallelism(PARALLELISM);
 		env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
 		env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
+		env.getCheckpointConfig().setAlignmentTimeout(0);
 		env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
 		if (checkpointingInterval > 0) {
 			env.enableCheckpointing(checkpointingInterval);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 2acf7b6..2904de7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -538,6 +538,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
 
 			final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
 			env.enableCheckpointing(100);
+			env.getCheckpointConfig().setAlignmentTimeout(1);
 			env.setParallelism(parallelism);
 			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(generateCheckpoint ? expectedFailures / 2 : expectedFailures, Time.milliseconds(100)));
 			env.getCheckpointConfig().enableUnalignedCheckpoints(true);


[flink] 13/18: [FLINK-19681][checkpointing] Address minor feedback

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf3b861c0b564f88df9f0e00fb9b90ab63f2a188
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Dec 3 11:21:50 2020 +0100

    [FLINK-19681][checkpointing] Address minor feedback
---
 .../runtime/checkpoint/CheckpointOptions.java      |  4 +--
 .../runtime/io/network/api/CheckpointBarrier.java  |  2 +-
 .../io/network/partition/PrioritizedDeque.java     |  2 +-
 .../streaming/runtime/io/AlignedController.java    |  1 +
 .../runtime/io/AlternatingController.java          | 31 ++++++++--------------
 .../runtime/io/StreamTaskNetworkInputTest.java     |  2 +-
 .../io/UnalignedControllerCancellationTest.java    |  2 +-
 7 files changed, 18 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index b092a92..1f12e7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -98,7 +98,7 @@ public class CheckpointOptions implements Serializable {
 	}
 
 	public boolean isTimeoutable() {
-		return alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT;
+		return !isUnalignedCheckpoint && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT);
 	}
 
 	// ------------------------------------------------------------------------
@@ -190,7 +190,7 @@ public class CheckpointOptions implements Serializable {
 			alignmentTimeout);
 	}
 
-	public CheckpointOptions toTimeouted() {
+	public CheckpointOptions asTimedOut() {
 		checkState(checkpointType == CheckpointType.CHECKPOINT);
 		return create(
 			checkpointType,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 8058f7b..e734616 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -119,6 +119,6 @@ public class CheckpointBarrier extends RuntimeEvent {
 	}
 
 	public CheckpointBarrier asUnaligned() {
-		return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toTimeouted());
+		return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut());
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
index 77108d0..7b7c282 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
@@ -150,7 +150,7 @@ public final class PrioritizedDeque<T> implements Iterable<T> {
 	 */
 	public T getAndRemove(Predicate<T> preCondition) {
 		Iterator<T> iterator = deque.iterator();
-		for (int i = 0; i < deque.size(); i++) {
+		for (int i = 0; iterator.hasNext(); i++) {
 			T next = iterator.next();
 			if (preCondition.test(next)) {
 				if (i < numPriorityElements) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index be6f2ef..033c626 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -76,6 +76,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 
 	@Override
 	public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) {
+		sequenceNumberInAnnouncedChannels.clear();
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 06e6bdb..080a7ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
@@ -40,7 +39,6 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	private final UnalignedController unalignedController;
 
 	private CheckpointBarrierBehaviourController activeController;
-	private long timeOutedBarrierId = -1; // used to shortcut timeout check
 
 	public AlternatingController(
 			AlignedController alignedController,
@@ -52,6 +50,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 	@Override
 	public void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier) {
 		activeController = chooseController(barrier);
+		activeController.preProcessFirstBarrierOrAnnouncement(barrier);
 	}
 
 	@Override
@@ -60,7 +59,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 			CheckpointBarrier announcedBarrier,
 			int sequenceNumber) throws IOException {
 
-		Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(announcedBarrier);
+		Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(announcedBarrier);
 		announcedBarrier = maybeTimedOut.orElse(announcedBarrier);
 
 		if (maybeTimedOut.isPresent() && activeController != unalignedController) {
@@ -81,7 +80,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 			return Optional.of(barrier);
 		}
 
-		Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(barrier);
+		Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier);
 		barrier = maybeTimedOut.orElse(barrier);
 
 		checkState(!activeController.barrierReceived(channelInfo, barrier).isPresent());
@@ -135,7 +134,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 
 	@Override
 	public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
-		Optional<CheckpointBarrier> maybeTimeOut = maybeTimeOut(barrier);
+		Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier);
 		if (maybeTimeOut.isPresent() && activeController == alignedController) {
 			switchToUnaligned(channelInfo, maybeTimeOut.get());
 			checkState(activeController == unalignedController);
@@ -186,20 +185,12 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 		return isAligned(barrier) ? alignedController : unalignedController;
 	}
 
-	private Optional<CheckpointBarrier> maybeTimeOut(CheckpointBarrier barrier) {
-		CheckpointOptions options = barrier.getCheckpointOptions();
-		boolean shouldTimeout = (options.isTimeoutable()) && (
-			barrier.getId() == timeOutedBarrierId ||
-				(System.currentTimeMillis() - barrier.getTimestamp()) > options.getAlignmentTimeout());
-		if (options.isUnalignedCheckpoint() || !shouldTimeout) {
-			return Optional.empty();
-		}
-		else {
-			timeOutedBarrierId = Math.max(timeOutedBarrierId, barrier.getId());
-			return Optional.of(new CheckpointBarrier(
-				barrier.getId(),
-				barrier.getTimestamp(),
-				options.toTimeouted()));
-		}
+	private Optional<CheckpointBarrier> asTimedOut(CheckpointBarrier barrier) {
+		return Optional.of(barrier).filter(this::canTimeout).map(CheckpointBarrier::asUnaligned);
+	}
+
+	private boolean canTimeout(CheckpointBarrier barrier) {
+		return barrier.getCheckpointOptions().isTimeoutable() &&
+			barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp());
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 236550c..e916329 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest {
 			deserializers);
 
 		inputGate.sendEvent(
-			new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()),
+			new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()),
 			channelId);
 		inputGate.sendElement(new StreamRecord<>(42L), channelId);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
index 47436c9..82ac3fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
@@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest {
 	}
 
 	private static CheckpointBarrier checkpoint(int checkpointId) {
-		return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted());
+		return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut());
 	}
 
 	private static CancelCheckpointMarker cancel(int checkpointId) {


[flink] 08/18: [hotfix][checkpointing] Add preconditions to channels and controllers

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f11452e6a29f15eb08aeb7ac4722691914cbe6fc
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Nov 11 12:29:09 2020 +0100

    [hotfix][checkpointing] Add preconditions to channels and controllers
---
 .../org/apache/flink/streaming/runtime/io/AlignedController.java | 3 +++
 .../apache/flink/streaming/runtime/io/UnalignedController.java   | 4 ++++
 .../flink/streaming/runtime/io/StreamTaskNetworkInputTest.java   | 2 +-
 .../runtime/io/UnalignedControllerCancellationTest.java          | 2 +-
 .../flink/streaming/runtime/io/UnalignedControllerTest.java      | 9 +++++++--
 5 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index a2ef674..0991dc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -92,6 +93,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 	public Optional<CheckpointBarrier> preProcessFirstBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) {
+		checkArgument(!barrier.getCheckpointOptions().isUnalignedCheckpoint(), "Unaligned barrier is not expected");
 		return Optional.empty();
 	}
 
@@ -99,6 +101,7 @@ public class AlignedController implements CheckpointBarrierBehaviourController {
 	public Optional<CheckpointBarrier> postProcessLastBarrier(
 			InputChannelInfo channelInfo,
 			CheckpointBarrier barrier) throws IOException {
+		checkState(!barrier.getCheckpointOptions().isUnalignedCheckpoint());
 		resumeConsumption();
 		return Optional.of(barrier);
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
index d19533e..6141918 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -53,6 +54,7 @@ public class UnalignedController implements CheckpointBarrierBehaviourController
 			InputChannelInfo channelInfo,
 			CheckpointBarrier announcedBarrier,
 			int sequenceNumber) throws IOException {
+		Preconditions.checkState(announcedBarrier.isCheckpoint());
 		inputs[channelInfo.getGateIdx()].convertToPriorityEvent(
 			channelInfo.getInputChannelIdx(),
 			sequenceNumber);
@@ -65,6 +67,7 @@ public class UnalignedController implements CheckpointBarrierBehaviourController
 
 	@Override
 	public Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
+		Preconditions.checkArgument(barrier.getCheckpointOptions().isUnalignedCheckpoint(), "Aligned barrier not expected");
 		checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions());
 		for (final CheckpointableInput input : inputs) {
 			input.checkpointStarted(barrier);
@@ -74,6 +77,7 @@ public class UnalignedController implements CheckpointBarrierBehaviourController
 
 	@Override
 	public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
+		// note that barrier can be aligned if checkpoint timed out in between; event is not converted
 		resetPendingCheckpoint(barrier.getId());
 		return Optional.empty();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 5f7498f..236550c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest {
 			deserializers);
 
 		inputGate.sendEvent(
-			new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()),
+			new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted()),
 			channelId);
 		inputGate.sendElement(new StreamRecord<>(42L), channelId);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
index 316560d..47436c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
@@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest {
 	}
 
 	private static CheckpointBarrier checkpoint(int checkpointId) {
-		return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation());
+		return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().toTimeouted());
 	}
 
 	private static CancelCheckpointMarker cancel(int checkpointId) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
index a365bf5..7604746 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
@@ -57,6 +59,9 @@ import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -615,7 +620,7 @@ public class UnalignedControllerTest {
 			new CheckpointBarrier(
 				checkpointId,
 				timestamp,
-				CheckpointOptions.forCheckpointWithDefaultLocation()),
+				new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT)),
 			new InputChannelInfo(0, channel));
 	}
 
@@ -729,7 +734,7 @@ public class UnalignedControllerTest {
 	}
 
 	private CheckpointBarrier buildCheckpointBarrier(long id) {
-		return new CheckpointBarrier(id, 0, CheckpointOptions.forCheckpointWithDefaultLocation());
+		return new CheckpointBarrier(id, 0, new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT));
 	}
 
 	// ------------------------------------------------------------------------


[flink] 09/18: [FLINK-19681][checkpointing] Fix barrier tracking in input channels

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b49ad6731c9798cb803b058ad02955c6fb3a2f05
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Nov 9 14:10:44 2020 +0100

    [FLINK-19681][checkpointing] Fix barrier tracking in input channels
    
    LocalInputChannel:
    Reset lastSeenBarrier for any barriers, not just UC.
    In local channels, there are no announcements,
    therefore lastSeenBarrier may not be reset for AC,
    therefore extra buffers may be added to state.
    
    Reduces failure frequency in UnalignedCheckpointIT par-local case.
    
    RemoteInputChannel:
    Don't update tracking state during conversion. Only do it
    upon receiving a barrier.
    
    Reduces failure frequency in UnalignedCheckpointIT par-remote case.
---
 .../partition/consumer/ChannelStatePersister.java  | 28 ++++++++--------
 .../partition/consumer/LocalInputChannel.java      |  7 ++--
 .../partition/consumer/RemoteInputChannel.java     | 22 +++++++------
 .../partition/consumer/LocalInputChannelTest.java  | 38 ++++++++++++++++++++--
 4 files changed, 63 insertions(+), 32 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
index 924e30f..5742279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
@@ -88,16 +88,16 @@ final class ChannelStatePersister {
 	}
 
 	protected Optional<Long> checkForBarrier(Buffer buffer) throws IOException {
-		final AbstractEvent priorityEvent = parsePriorityEvent(buffer);
-		if (priorityEvent instanceof CheckpointBarrier) {
-			if (((CheckpointBarrier) priorityEvent).getId() >= lastSeenBarrier) {
+		final AbstractEvent event = parseEvent(buffer);
+		if (event instanceof CheckpointBarrier) {
+			if (((CheckpointBarrier) event).getId() >= lastSeenBarrier) {
 				checkpointStatus = CheckpointStatus.BARRIER_RECEIVED;
-				lastSeenBarrier = ((CheckpointBarrier) priorityEvent).getId();
+				lastSeenBarrier = ((CheckpointBarrier) event).getId();
 				return Optional.of(lastSeenBarrier);
 			}
 		}
-		else if (priorityEvent instanceof EventAnnouncement) {
-			EventAnnouncement announcement = (EventAnnouncement) priorityEvent;
+		if (event instanceof EventAnnouncement) { // NOTE: only remote channels
+			EventAnnouncement announcement = (EventAnnouncement) event;
 			if (announcement.getAnnouncedEvent() instanceof CheckpointBarrier) {
 				return Optional.of(((CheckpointBarrier) announcement.getAnnouncedEvent()).getId());
 			}
@@ -110,16 +110,16 @@ final class ChannelStatePersister {
 	 * returns null in all other cases.
 	 */
 	@Nullable
-	protected AbstractEvent parsePriorityEvent(Buffer buffer) throws IOException {
-		if (buffer.isBuffer() || !buffer.getDataType().hasPriority()) {
+	protected AbstractEvent parseEvent(Buffer buffer) throws IOException {
+		if (buffer.isBuffer()) {
 			return null;
+		} else {
+			AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+			// reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer.
+			// we can further improve to avoid double deserialization in the future.
+			buffer.setReaderIndex(0);
+			return event;
 		}
-
-		AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
-		// reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer.
-		// we can further improve to avoid double deserialization in the future.
-		buffer.setReaderIndex(0);
-		return event;
 	}
 
 	protected boolean hasBarrierReceived() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index a0d69c5..0fafb36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -229,11 +229,8 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
 		numBytesIn.inc(buffer.getSize());
 		numBuffersIn.inc();
-		if (buffer.getDataType().hasPriority()) {
-			channelStatePersister.checkForBarrier(buffer);
-		} else {
-			channelStatePersister.maybePersist(buffer);
-		}
+		channelStatePersister.checkForBarrier(buffer);
+		channelStatePersister.maybePersist(buffer);
 		return Optional.of(new BufferAndAvailability(
 			buffer,
 			next.getNextDataType(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 27fa58f..c012766 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -451,11 +451,20 @@ public class RemoteInputChannel extends InputChannel {
 				}
 				else {
 					receivedBuffers.add(sequenceBuffer);
-					channelStatePersister.maybePersist(buffer);
 					if (dataType.requiresAnnouncement()) {
 						firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
 					}
 				}
+				channelStatePersister
+					.checkForBarrier(sequenceBuffer.buffer)
+					.filter(id -> id > lastBarrierId)
+					.ifPresent(id -> {
+						// checkpoint was not yet started by task thread,
+						// so remember the numbers of buffers to spill for the time when it will be started
+						lastBarrierId = id;
+						lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber;
+					});
+				channelStatePersister.maybePersist(buffer);
 				++expectedSequenceNumber;
 			}
 			recycleBuffer = false;
@@ -480,15 +489,8 @@ public class RemoteInputChannel extends InputChannel {
 	/**
 	 * @return {@code true} if this was first priority buffer added.
 	 */
-	private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) throws IOException {
+	private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) {
 		receivedBuffers.addPriorityElement(sequenceBuffer);
-		channelStatePersister
-			.checkForBarrier(sequenceBuffer.buffer)
-			.filter(id -> id > lastBarrierId)
-			.ifPresent(id -> {
-				lastBarrierId = id;
-				lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber;
-			});
 		return receivedBuffers.getNumPriorityElements() == 1;
 	}
 
@@ -552,7 +554,7 @@ public class RemoteInputChannel extends InputChannel {
 	public void convertToPriorityEvent(int sequenceNumber) throws IOException {
 		boolean firstPriorityEvent;
 		synchronized (receivedBuffers) {
-			checkState(!channelStatePersister.hasBarrierReceived());
+			checkState(channelStatePersister.hasBarrierReceived());
 			int numPriorityElementsBeforeRemoval = receivedBuffers.getNumPriorityElements();
 			SequenceBuffer toPrioritize = receivedBuffers.getAndRemove(
 				sequenceBuffer -> sequenceBuffer.sequenceNumber == sequenceNumber);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 08cdf6c..8785a68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -29,8 +29,10 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
@@ -54,6 +56,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.hamcrest.Matchers;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -73,6 +76,7 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate;
 import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
@@ -94,6 +98,30 @@ import static org.mockito.Mockito.when;
  */
 public class LocalInputChannelTest {
 
+	@Test
+	public void testNoDataPersistedAfterReceivingAlignedBarrier() throws Exception {
+		CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, CheckpointOptions.alignedWithTimeout(getDefault(), 123L));
+		BufferConsumer barrierHolder = new BufferConsumer(EventSerializer.toBuffer(barrier, false).getMemorySegment(), FreeingBufferRecycler.INSTANCE, Buffer.DataType.EVENT_BUFFER);
+		BufferConsumer data = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1);
+
+		RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
+		LocalInputChannel channel = InputChannelBuilder
+			.newBuilder()
+			.setPartitionManager(new TestingResultPartitionManager(createResultSubpartitionView(barrierHolder, data)))
+			.setStateWriter(stateWriter)
+			.buildLocalChannel(new SingleInputGateBuilder().build());
+		channel.requestSubpartition(0);
+
+		// pull AC barrier
+		channel.getNextBuffer();
+		// pretend that alignment timed out
+		stateWriter.start(barrier.getId(), barrier.getCheckpointOptions());
+		channel.checkpointStarted(barrier);
+		// pull data
+		channel.getNextBuffer();
+		Assert.assertTrue("no data should be persisted after receiving a barrier", stateWriter.getAddedInput().isEmpty());
+	}
+
 	/**
 	 * Tests the consumption of multiple subpartitions via local input channels.
 	 *
@@ -472,7 +500,7 @@ public class LocalInputChannelTest {
 		inputGate.setInputChannels(channel);
 		channel.requestSubpartition(0);
 
-		final CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault();
+		final CheckpointStorageLocationReference location = getDefault();
 		CheckpointOptions options = new CheckpointOptions(CheckpointType.CHECKPOINT, location, true, true, 0);
 		stateWriter.start(0, options);
 
@@ -497,6 +525,10 @@ public class LocalInputChannelTest {
 	// ---------------------------------------------------------------------------------------------
 
 	private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) throws IOException {
+		return addBuffer ? createResultSubpartitionView(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096)) : createResultSubpartitionView();
+	}
+
+	private static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... buffers) throws IOException {
 		int bufferSize = 4096;
 		PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
 			ResultPartitionType.PIPELINED,
@@ -504,8 +536,8 @@ public class LocalInputChannelTest {
 			true,
 			bufferSize);
 		ResultSubpartition subpartition = parent.getAllPartitions()[0];
-		if (addBuffer) {
-			subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(bufferSize));
+		for (BufferConsumer buffer : buffers) {
+			subpartition.add(buffer);
 		}
 		return subpartition.createReadView(() -> {});
 	}


[flink] 12/18: [FLINK-19681][checkpointing] Use converted barrier after disabling alignment

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 002a584ed6f55508ab7266a27433a1b72b59ddee
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Nov 10 14:15:41 2020 +0100

    [FLINK-19681][checkpointing] Use converted barrier after disabling alignment
    
    Otherwise, further components (e.g. SubtaskCheckpointCoordinator) can
    get an AC barrier for the UC checkpoint.
---
 .../org/apache/flink/runtime/io/network/api/CheckpointBarrier.java    | 4 ++++
 .../org/apache/flink/streaming/runtime/io/AlternatingController.java  | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 2ff1632..8058f7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -117,4 +117,8 @@ public class CheckpointBarrier extends RuntimeEvent {
 	public boolean isCheckpoint() {
 		return !checkpointOptions.getCheckpointType().isSavepoint();
 	}
+
+	public CheckpointBarrier asUnaligned() {
+		return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toTimeouted());
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 90b79c4..06e6bdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -88,7 +88,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll
 
 		if (maybeTimedOut.isPresent()) {
 			if (activeController == alignedController) {
-				switchToUnaligned(channelInfo, barrier);
+				switchToUnaligned(channelInfo, maybeTimedOut.get());
 				return maybeTimedOut;
 			}
 			else {