You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:17 UTC

[flink] 08/16: [FLIKN-12777][network] Refactor BarrierTracker to use the same code structure as BarrierBuffer

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52525884826cd0e99b2309d89d74fa270d51ec90
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat Jun 15 10:18:01 2019 +0200

    [FLIKN-12777][network] Refactor BarrierTracker to use the same code structure as BarrierBuffer
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  |  52 ++++---
 .../streaming/runtime/io/BarrierDiscarder.java     | 105 --------------
 .../runtime/io/BufferOrEventSequence.java          |   2 +-
 .../runtime/io/CheckpointBarrierAligner.java       |  65 ++-------
 .../runtime/io/CheckpointBarrierDiscarder.java     |  74 ++++++++++
 .../runtime/io/CheckpointBarrierHandler.java       |  85 ++++++++---
 ...rTracker.java => CheckpointBarrierTracker.java} | 158 +++++++--------------
 ...rierHandler.java => CheckpointedInputGate.java} |   7 +-
 .../streaming/runtime/io/EmptyBufferStorage.java   |  73 ++++++++++
 .../streaming/runtime/io/InputProcessorUtil.java   |  11 +-
 .../streaming/runtime/io/StreamInputProcessor.java |   2 +-
 .../runtime/io/StreamTaskNetworkInput.java         |   6 +-
 .../runtime/io/StreamTwoInputProcessor.java        |   2 +-
 .../io/StreamTwoInputSelectableProcessor.java      |   5 +-
 .../runtime/io/BarrierBufferTestBase.java          |  32 ++---
 .../streaming/runtime/io/BarrierTrackerTest.java   |  11 +-
 16 files changed, 349 insertions(+), 341 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 1594389..8dcc005 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,7 +38,7 @@ import java.util.concurrent.CompletableFuture;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
+ * The barrier buffer is {@link CheckpointedInputGate} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
  *
  * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
@@ -46,11 +46,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * the blocks are released.
  */
 @Internal
-public class BarrierBuffer implements CheckpointBarrierHandler {
+public class BarrierBuffer implements CheckpointedInputGate {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
 
-	private final CheckpointBarrierAligner barrierAligner;
+	private final CheckpointBarrierHandler barrierHandler;
 
 	/** The gate that the buffer draws its input from. */
 	private final InputGate inputGate;
@@ -77,6 +77,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		this (inputGate, bufferStorage, "Testing: No task associated", null);
 	}
 
+	BarrierBuffer(
+			InputGate inputGate,
+			BufferStorage bufferStorage,
+			String taskName,
+			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+		this(
+			inputGate,
+			bufferStorage,
+			new CheckpointBarrierAligner(
+				inputGate.getNumberOfInputChannels(),
+				taskName,
+				toNotifyOnCheckpoint)
+		);
+	}
+
 	/**
 	 * Creates a new checkpoint stream aligner.
 	 *
@@ -86,20 +101,15 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 *
 	 * @param inputGate The input gate to draw the buffers and events from.
 	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
-	 * @param taskName The task name for logging.
-	 * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
+	 * @param barrierHandler Handler that controls which channels are blocked.
 	 */
 	BarrierBuffer(
 			InputGate inputGate,
 			BufferStorage bufferStorage,
-			String taskName,
-			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+			CheckpointBarrierHandler barrierHandler) {
 		this.inputGate = inputGate;
 		this.bufferStorage = checkNotNull(bufferStorage);
-		this.barrierAligner = new CheckpointBarrierAligner(
-			inputGate.getNumberOfInputChannels(),
-			taskName,
-			toNotifyOnCheckpoint);
+		this.barrierHandler = barrierHandler;
 	}
 
 	@Override
@@ -131,11 +141,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			}
 
 			BufferOrEvent bufferOrEvent = next.get();
-			if (barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) {
+			if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
 				// if the channel is blocked, we just store the BufferOrEvent
 				bufferStorage.add(bufferOrEvent);
 				if (bufferStorage.isFull()) {
-					barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+					barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
 					bufferStorage.rollOver();
 				}
 			}
@@ -146,19 +156,19 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
 				if (!endOfInputGate) {
 					// process barriers only if there is a chance of the checkpoint completing
-					if (barrierAligner.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+					if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
 						bufferStorage.rollOver();
 					}
 				}
 			}
 			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
-				if (barrierAligner.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
+				if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
 					bufferStorage.rollOver();
 				}
 			}
 			else {
 				if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
-					if (barrierAligner.processEndOfPartition()) {
+					if (barrierHandler.processEndOfPartition()) {
 						bufferStorage.rollOver();
 					}
 				}
@@ -178,7 +188,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		} else {
 			// end of input stream. stream continues with the buffered data
 			endOfInputGate = true;
-			barrierAligner.releaseBlocksAndResetBarriers();
+			barrierHandler.releaseBlocksAndResetBarriers();
 			bufferStorage.rollOver();
 			return pollNext();
 		}
@@ -208,13 +218,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 *
 	 * @return The ID of the pending of completed checkpoint.
 	 */
-	public long getCurrentCheckpointId() {
-		return barrierAligner.getCurrentCheckpointId();
+	public long getLatestCheckpointId() {
+		return barrierHandler.getLatestCheckpointId();
 	}
 
 	@Override
 	public long getAlignmentDurationNanos() {
-		return barrierAligner.getAlignmentDurationNanos();
+		return barrierHandler.getAlignmentDurationNanos();
 	}
 
 	@Override
@@ -228,6 +238,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	@Override
 	public String toString() {
-		return barrierAligner.toString();
+		return barrierHandler.toString();
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
deleted file mode 100644
index c33c940..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The BarrierDiscarder discards checkpoint barriers have been received from which input channels.
- */
-@Internal
-public class BarrierDiscarder implements CheckpointBarrierHandler {
-
-	// ------------------------------------------------------------------------
-
-	/** The input gate, to draw the buffers and events from. */
-	private final InputGate inputGate;
-
-	/**
-	 * The number of channels. Once that many barriers have been received for a checkpoint, the
-	 * checkpoint is considered complete.
-	 */
-	private final int totalNumberOfInputChannels;
-
-	// ------------------------------------------------------------------------
-
-	public BarrierDiscarder(InputGate inputGate) {
-		this.inputGate = inputGate;
-		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-	}
-
-	@Override
-	public CompletableFuture<?> isAvailable() {
-		return inputGate.isAvailable();
-	}
-
-	@Override
-	public boolean isFinished() {
-		return inputGate.isFinished();
-	}
-
-	@Override
-	public Optional<BufferOrEvent> pollNext() throws Exception {
-		while (true) {
-			Optional<BufferOrEvent> next = inputGate.pollNext();
-			if (!next.isPresent()) {
-				// buffer or input exhausted
-				return next;
-			}
-
-			BufferOrEvent bufferOrEvent = next.get();
-			if (bufferOrEvent.isBuffer()) {
-				return next;
-			}
-			else if (bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class &&
-				bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class) {
-				// some other event
-				return next;
-			}
-		}
-	}
-
-	@Override
-	public void cleanup() {
-
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return true;
-	}
-
-	@Override
-	public long getAlignmentDurationNanos() {
-		// this one does not do alignment at all
-		return 0L;
-	}
-
-	@Override
-	public int getNumberOfInputChannels() {
-		return totalNumberOfInputChannels;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
index c5bde1b..21649bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 
 /**
  * This class represents a sequence of buffers and events which are blocked by
- * {@link CheckpointBarrierHandler}. The sequence of buffers and events can be
+ * {@link CheckpointedInputGate}. The sequence of buffers and events can be
  * read back using the method {@link #getNext()}.
  */
 @Internal
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index 30e05c1..482ba65 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -40,7 +38,7 @@ import java.io.IOException;
  * release blocked channels.
  */
 @Internal
-public class CheckpointBarrierAligner {
+public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierAligner.class);
 
@@ -52,9 +50,6 @@ public class CheckpointBarrierAligner {
 
 	private final String taskName;
 
-	@Nullable
-	private final AbstractInvokable toNotifyOnCheckpoint;
-
 	/** The ID of the checkpoint for which we expect barriers. */
 	private long currentCheckpointId = -1L;
 
@@ -77,13 +72,14 @@ public class CheckpointBarrierAligner {
 			int totalNumberOfInputChannels,
 			String taskName,
 			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+		super(toNotifyOnCheckpoint);
 		this.totalNumberOfInputChannels = totalNumberOfInputChannels;
 		this.taskName = taskName;
-		this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 
 		this.blockedChannels = new boolean[totalNumberOfInputChannels];
 	}
 
+	@Override
 	public void releaseBlocksAndResetBarriers() throws IOException {
 		LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
 
@@ -100,19 +96,12 @@ public class CheckpointBarrierAligner {
 		}
 	}
 
-	/**
-	 * Checks whether the channel with the given index is blocked.
-	 *
-	 * @param channelIndex The channel index to check.
-	 * @return True if the channel is blocked, false if not.
-	 */
+	@Override
 	public boolean isBlocked(int channelIndex) {
 		return blockedChannels[channelIndex];
 	}
 
-	/**
-	 * @return true if some blocked data should be unblocked/rolled over.
-	 */
+	@Override
 	public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
@@ -121,7 +110,7 @@ public class CheckpointBarrierAligner {
 			if (barrierId > currentCheckpointId) {
 				// new checkpoint
 				currentCheckpointId = barrierId;
-				notifyCheckpoint(receivedBarrier, bufferedBytes);
+				notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
 			}
 			return false;
 		}
@@ -185,7 +174,7 @@ public class CheckpointBarrierAligner {
 			}
 
 			releaseBlocksAndResetBarriers();
-			notifyCheckpoint(receivedBarrier, bufferedBytes);
+			notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
 			return true;
 		}
 		return checkpointAborted;
@@ -222,9 +211,7 @@ public class CheckpointBarrierAligner {
 		}
 	}
 
-	/**
-	 * @return true if some blocked data should be unblocked/rolled over.
-	 */
+	@Override
 	public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
 		final long barrierId = cancelBarrier.getCheckpointId();
 
@@ -300,9 +287,7 @@ public class CheckpointBarrierAligner {
 		return false;
 	}
 
-	/**
-	 * @return true if some blocked data should be unblocked/rolled over.
-	 */
+	@Override
 	public boolean processEndOfPartition() throws Exception {
 		numClosedChannels++;
 
@@ -317,37 +302,12 @@ public class CheckpointBarrierAligner {
 		return false;
 	}
 
-	private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes) throws Exception {
-		if (toNotifyOnCheckpoint != null) {
-			CheckpointMetaData checkpointMetaData =
-				new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
-
-			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
-				.setBytesBufferedInAlignment(bufferedBytes)
-				.setAlignmentDurationNanos(latestAlignmentDurationNanos);
-
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-				checkpointMetaData,
-				checkpointBarrier.getCheckpointOptions(),
-				checkpointMetrics);
-		}
-	}
-
-	private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
-		notifyAbort(checkpointId,
-			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
-	}
-
-	private void notifyAbort(long checkpointId, CheckpointException cause) throws Exception {
-		if (toNotifyOnCheckpoint != null) {
-			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
-		}
-	}
-
-	public long getCurrentCheckpointId() {
+	@Override
+	public long getLatestCheckpointId() {
 		return currentCheckpointId;
 	}
 
+	@Override
 	public long getAlignmentDurationNanos() {
 		if (startOfAlignmentTimestamp <= 0) {
 			return latestAlignmentDurationNanos;
@@ -365,6 +325,7 @@ public class CheckpointBarrierAligner {
 			numClosedChannels);
 	}
 
+	@Override
 	public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
 		releaseBlocksAndResetBarriers();
 		notifyAbort(currentCheckpointId,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
new file mode 100644
index 0000000..4c6cdab
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have been received from which input channels.
+ */
+@Internal
+public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler {
+	public CheckpointBarrierDiscarder() {
+		super(null);
+	}
+
+	@Override
+	public void releaseBlocksAndResetBarriers() throws IOException {
+	}
+
+	@Override
+	public boolean isBlocked(int channelIndex) {
+		return false;
+	}
+
+	@Override
+	public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
+		return false;
+	}
+
+	@Override
+	public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		return false;
+	}
+
+	@Override
+	public boolean processEndOfPartition() throws Exception {
+		return false;
+	}
+
+	@Override
+	public long getLatestCheckpointId() {
+		return 0;
+	}
+
+	@Override
+	public long getAlignmentDurationNanos() {
+		return 0;
+	}
+
+	@Override
+	public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
+
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 2ee1a97..41e043e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -18,43 +18,88 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.AsyncDataInput;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
 /**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
+ * The {@link CheckpointBarrierHandler} reacts to checkpoint barrier arriving from the input channels.
  * Different implementations may either simply track barriers, or block certain inputs on
  * barriers.
  */
-@Internal
-public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
+public abstract class CheckpointBarrierHandler {
+
+	/** The listener to be notified on complete checkpoints. */
+	@Nullable
+	private final AbstractInvokable toNotifyOnCheckpoint;
+
+	public CheckpointBarrierHandler(@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+		this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
+	}
+
+	public abstract void releaseBlocksAndResetBarriers() throws IOException;
+
 	/**
-	 * Cleans up all internally held resources.
+	 * Checks whether the channel with the given index is blocked.
 	 *
-	 * @throws IOException Thrown if the cleanup of I/O resources failed.
+	 * @param channelIndex The channel index to check.
+	 * @return True if the channel is blocked, false if not.
 	 */
-	void cleanup() throws IOException;
+	public abstract boolean isBlocked(int channelIndex);
 
 	/**
-	 * Checks if the barrier handler has buffered any data internally.
-	 * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
+	 * @return true if some blocked data should be unblocked/rolled over.
 	 */
-	boolean isEmpty();
+	public abstract boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception;
 
 	/**
-	 * Gets the time that the latest alignment took, in nanoseconds.
-	 * If there is currently an alignment in progress, it will return the time spent in the
-	 * current alignment so far.
-	 *
-	 * @return The duration in nanoseconds
+	 * @return true if some blocked data should be unblocked/rolled over.
 	 */
-	long getAlignmentDurationNanos();
+	public abstract boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception;
 
 	/**
-	 * @return number of underlying input channels.
+	 * @return true if some blocked data should be unblocked/rolled over.
 	 */
-	int getNumberOfInputChannels();
+	public abstract boolean processEndOfPartition() throws Exception;
+
+	public abstract long getLatestCheckpointId();
+
+	public abstract long getAlignmentDurationNanos();
+
+	public abstract void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception;
+
+	protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			CheckpointMetaData checkpointMetaData =
+				new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+
+			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
+				.setBytesBufferedInAlignment(bufferedBytes)
+				.setAlignmentDurationNanos(alignmentDurationNanos);
+
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+				checkpointMetaData,
+				checkpointBarrier.getCheckpointOptions(),
+				checkpointMetrics);
+		}
+	}
+
+	protected void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
+		notifyAbort(checkpointId,
+			new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
+	}
+
+	protected void notifyAbort(long checkpointId, CheckpointException cause) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
similarity index 61%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
index f7629bb..0ec9004 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
@@ -21,13 +21,8 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import org.slf4j.Logger;
@@ -36,24 +31,22 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 /**
- * The BarrierTracker keeps track of what checkpoint barriers have been received from
+ * The {@link CheckpointBarrierTracker} keeps track of what checkpoint barriers have been received from
  * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
  * it notifies its listener of a completed checkpoint.
  *
- * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
+ * <p>Unlike the {@link CheckpointBarrierAligner}, the BarrierTracker does not block the input
  * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
  * guarantees. It can, however, be used to gain "at least once" processing guarantees.
  *
  * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
  */
 @Internal
-public class BarrierTracker implements CheckpointBarrierHandler {
+public class CheckpointBarrierTracker extends CheckpointBarrierHandler {
 
-	private static final Logger LOG = LoggerFactory.getLogger(BarrierTracker.class);
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierTracker.class);
 
 	/**
 	 * The tracker tracks a maximum number of checkpoints, for which some, but not all barriers
@@ -63,9 +56,6 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 	// ------------------------------------------------------------------------
 
-	/** The input gate, to draw the buffers and events from. */
-	private final InputGate inputGate;
-
 	/**
 	 * The number of channels. Once that many barriers have been received for a checkpoint, the
 	 * checkpoint is considered complete.
@@ -78,89 +68,36 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	 */
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 
-	/** The listener to be notified on complete checkpoints. */
-	private final AbstractInvokable toNotifyOnCheckpoint;
-
 	/** The highest checkpoint ID encountered so far. */
 	private long latestPendingCheckpointID = -1;
 
-	// ------------------------------------------------------------------------
-
-	public BarrierTracker(InputGate inputGate) {
-		this(inputGate, null);
+	public CheckpointBarrierTracker(int totalNumberOfInputChannels) {
+		this(totalNumberOfInputChannels, null);
 	}
 
-	public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
-		this.inputGate = inputGate;
-		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+	public CheckpointBarrierTracker(int totalNumberOfInputChannels, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+		super(toNotifyOnCheckpoint);
+		this.totalNumberOfInputChannels = totalNumberOfInputChannels;
 		this.pendingCheckpoints = new ArrayDeque<>();
-		this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 	}
 
 	@Override
-	public CompletableFuture<?> isAvailable() {
-		return inputGate.isAvailable();
+	public void releaseBlocksAndResetBarriers() {
 	}
 
 	@Override
-	public boolean isFinished() {
-		return inputGate.isFinished();
+	public boolean isBlocked(int channelIndex) {
+		return false;
 	}
 
 	@Override
-	public Optional<BufferOrEvent> pollNext() throws Exception {
-		while (true) {
-			Optional<BufferOrEvent> next = inputGate.pollNext();
-			if (!next.isPresent()) {
-				// buffer or input exhausted
-				return next;
-			}
-
-			BufferOrEvent bufferOrEvent = next.get();
-			if (bufferOrEvent.isBuffer()) {
-				return next;
-			}
-			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
-				processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
-			}
-			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
-				processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
-			}
-			else {
-				// some other event
-				return next;
-			}
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		pendingCheckpoints.clear();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return pendingCheckpoints.isEmpty();
-	}
-
-	@Override
-	public long getAlignmentDurationNanos() {
-		// this one does not do alignment at all
-		return 0L;
-	}
-
-	@Override
-	public int getNumberOfInputChannels() {
-		return totalNumberOfInputChannels;
-	}
-
-	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
+	public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
-			return;
+			notifyCheckpoint(receivedBarrier, 0, 0);
+			return false;
 		}
 
 		// general path for multiple input channels
@@ -169,20 +106,20 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 
 		// find the checkpoint barrier in the queue of pending barriers
-		CheckpointBarrierCount cbc = null;
+		CheckpointBarrierCount barrierCount = null;
 		int pos = 0;
 
 		for (CheckpointBarrierCount next : pendingCheckpoints) {
 			if (next.checkpointId == barrierId) {
-				cbc = next;
+				barrierCount = next;
 				break;
 			}
 			pos++;
 		}
 
-		if (cbc != null) {
+		if (barrierCount != null) {
 			// add one to the count to that barrier and check for completion
-			int numBarriersNew = cbc.incrementBarrierCount();
+			int numBarriersNew = barrierCount.incrementBarrierCount();
 			if (numBarriersNew == totalNumberOfInputChannels) {
 				// checkpoint can be triggered (or is aborted and all barriers have been seen)
 				// first, remove this checkpoint and all all prior pending
@@ -192,12 +129,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				}
 
 				// notify the listener
-				if (!cbc.isAborted()) {
+				if (!barrierCount.isAborted()) {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Received all barriers for checkpoint {}", barrierId);
 					}
 
-					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
+					notifyCheckpoint(receivedBarrier, 0, 0);
 				}
 			}
 		}
@@ -216,19 +153,21 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				}
 			}
 		}
+		return false;
 	}
 
-	private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int channelIndex) throws Exception {
-		final long checkpointId = barrier.getCheckpointId();
+	@Override
+	public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		final long checkpointId = cancelBarrier.getCheckpointId();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Received cancellation barrier for checkpoint {} from channel {}", checkpointId, channelIndex);
+			LOG.debug("Received cancellation barrier for checkpoint {}", checkpointId);
 		}
 
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			notifyAbort(checkpointId);
-			return;
+			notifyAbortOnCancellationBarrier(checkpointId);
+			return false;
 		}
 
 		// -- general path for multiple input channels --
@@ -241,7 +180,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 			if (cbc.markAborted()) {
 				// abort the subsumed checkpoints if not already done
-				notifyAbort(cbc.checkpointId());
+				notifyAbortOnCancellationBarrier(cbc.checkpointId());
 			}
 		}
 
@@ -249,7 +188,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 			// make sure the checkpoint is remembered as aborted
 			if (cbc.markAborted()) {
 				// this was the first time the checkpoint was aborted - notify
-				notifyAbort(checkpointId);
+				notifyAbortOnCancellationBarrier(checkpointId);
 			}
 
 			// we still count the barriers to be able to remove the entry once all barriers have been seen
@@ -259,7 +198,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 			}
 		}
 		else if (checkpointId > latestPendingCheckpointID) {
-			notifyAbort(checkpointId);
+			notifyAbortOnCancellationBarrier(checkpointId);
 
 			latestPendingCheckpointID = checkpointId;
 
@@ -272,28 +211,33 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		} else {
 			// trailing cancellation barrier which was already cancelled
 		}
+		return false;
 	}
 
-	private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
-		if (toNotifyOnCheckpoint != null) {
-			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
-			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
-				.setBytesBufferedInAlignment(0L)
-				.setAlignmentDurationNanos(0L);
-
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
+	@Override
+	public boolean processEndOfPartition() throws Exception {
+		while (!pendingCheckpoints.isEmpty()) {
+			CheckpointBarrierCount barrierCount = pendingCheckpoints.removeFirst();
+			if (barrierCount.markAborted()) {
+				notifyAbort(barrierCount.checkpointId(),
+					new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+			}
 		}
+		return false;
 	}
 
-	private void notifyAbort(long checkpointId) throws Exception {
-		if (toNotifyOnCheckpoint != null) {
-			toNotifyOnCheckpoint.abortCheckpointOnBarrier(
-				checkpointId,
-				new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
-		}
+	public long getLatestCheckpointId() {
+		return pendingCheckpoints.isEmpty() ? -1 : pendingCheckpoints.peekLast().checkpointId();
 	}
 
-	// ------------------------------------------------------------------------
+	public long getAlignmentDurationNanos() {
+		return 0;
+	}
+
+	@Override
+	public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
+		throw new UnsupportedOperationException("This should never happened as this class doesn't block any data");
+	}
 
 	/**
 	 * Simple class for a checkpoint ID with a barrier counter.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
similarity index 84%
copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 2ee1a97..cdbbfbc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -25,12 +25,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import java.io.IOException;
 
 /**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
- * Different implementations may either simply track barriers, or block certain inputs on
- * barriers.
+ * The {@link CheckpointedInputGate} uses {@link CheckpointBarrierHandler} to handle incoming
+ * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier} from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputGate}.
  */
 @Internal
-public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
+public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> {
 	/**
 	 * Cleans up all internally held resources.
 	 *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
new file mode 100644
index 0000000..5535c49
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Always empty implementation of {@link BufferStorage}. It doesn't allow for adding any data.
+ */
+@Internal
+public class EmptyBufferStorage implements BufferStorage {
+	@Override
+	public void add(BufferOrEvent boe) throws IOException {
+		throw new UnsupportedOperationException("Adding to EmptyBufferStorage is unsupported");
+	}
+
+	@Override
+	public boolean isFull() {
+		return false;
+	}
+
+	@Override
+	public void rollOver() throws IOException {
+	}
+
+	@Override
+	public long getPendingBytes() {
+		return 0;
+	}
+
+	@Override
+	public long getRolledBytes() {
+		return 0;
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return true;
+	}
+
+	@Override
+	public Optional<BufferOrEvent> pollNext() throws IOException {
+		return Optional.empty();
+	}
+
+	@Override
+	public long getMaxBufferedBytes() {
+		return -1;
+	}
+
+	@Override
+	public void close() throws IOException {
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index c9ec6bf..75926b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -30,13 +30,13 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import java.io.IOException;
 
 /**
- * Utility for creating {@link CheckpointBarrierHandler} based on checkpoint mode
+ * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
  * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
  */
 @Internal
 public class InputProcessorUtil {
 
-	public static CheckpointBarrierHandler createCheckpointBarrierHandler(
+	public static CheckpointedInputGate createCheckpointBarrierHandler(
 			StreamTask<?, ?> checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
@@ -44,7 +44,7 @@ public class InputProcessorUtil {
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
-		CheckpointBarrierHandler barrierHandler;
+		CheckpointedInputGate barrierHandler;
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
 			long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
 			if (!(maxAlign == -1 || maxAlign > 0)) {
@@ -67,7 +67,10 @@ public class InputProcessorUtil {
 					checkpointedTask);
 			}
 		} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-			barrierHandler = new BarrierTracker(inputGate);
+			barrierHandler = new BarrierBuffer(
+				inputGate,
+				new EmptyBufferStorage(),
+				new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask));
 		} else {
 			throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 742dbe8..58b2051 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -104,7 +104,7 @@ public class StreamInputProcessor<IN> {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
-		CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
+		CheckpointedInputGate barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
 			checkpointedTask,
 			checkpointMode,
 			ioManager,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 85e7f46..ecf88e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -42,12 +42,12 @@ import java.util.concurrent.CompletableFuture;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Implementation of {@link StreamTaskInput} that wraps an input from network taken from {@link CheckpointBarrierHandler}.
+ * Implementation of {@link StreamTaskInput} that wraps an input from network taken from {@link CheckpointedInputGate}.
  */
 @Internal
 public final class StreamTaskNetworkInput implements StreamTaskInput {
 
-	private final CheckpointBarrierHandler barrierHandler;
+	private final CheckpointedInputGate barrierHandler;
 
 	private final DeserializationDelegate<StreamElement> deserializationDelegate;
 
@@ -63,7 +63,7 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 
 	@SuppressWarnings("unchecked")
 	public StreamTaskNetworkInput(
-			CheckpointBarrierHandler barrierHandler,
+			CheckpointedInputGate barrierHandler,
 			TypeSerializer<?> inputSerializer,
 			IOManager ioManager,
 			int inputIndex) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 4efbc7f..aa6354d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -86,7 +86,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
 	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
 
-	private final CheckpointBarrierHandler barrierHandler;
+	private final CheckpointedInputGate barrierHandler;
 
 	private final Object lock;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 54ce749..d5ebf29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -120,8 +120,9 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 		InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
 
 		// create a Input instance for each input
-		this.input1 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate1), inputSerializer1, ioManager, 0);
-		this.input2 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate2), inputSerializer2, ioManager, 1);
+		CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize());
+		this.input1 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
+		this.input2 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
 
 		this.statusWatermarkValve1 = new StatusWatermarkValve(
 			unionedInputGate1.getNumberOfInputChannels(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index e6e48a8..13c4aad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -483,7 +483,7 @@ public abstract class BarrierBufferTestBase {
 		// align checkpoint 1
 		startTs = System.nanoTime();
 		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getCurrentCheckpointId());
+		assertEquals(1L, buffer.getLatestCheckpointId());
 
 		// checkpoint done - replay buffered
 		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
@@ -501,7 +501,7 @@ public abstract class BarrierBufferTestBase {
 
 		// checkpoint 2 aborted, checkpoint 3 started
 		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(3L, buffer.getCurrentCheckpointId());
+		assertEquals(3L, buffer.getLatestCheckpointId());
 		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
 		verify(toNotify).abortCheckpointOnBarrier(eq(2L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
@@ -565,7 +565,7 @@ public abstract class BarrierBufferTestBase {
 		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getCurrentCheckpointId());
+		assertEquals(1L, buffer.getLatestCheckpointId());
 
 		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
@@ -574,7 +574,7 @@ public abstract class BarrierBufferTestBase {
 
 		// alignment of checkpoint 2
 		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getCurrentCheckpointId());
+		assertEquals(2L, buffer.getLatestCheckpointId());
 		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
@@ -583,7 +583,7 @@ public abstract class BarrierBufferTestBase {
 
 		// checkpoint 2 aborted, checkpoint 4 started. replay buffered
 		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(4L, buffer.getCurrentCheckpointId());
+		assertEquals(4L, buffer.getLatestCheckpointId());
 		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
@@ -651,7 +651,7 @@ public abstract class BarrierBufferTestBase {
 		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getCurrentCheckpointId());
+		assertEquals(1L, buffer.getLatestCheckpointId());
 		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
@@ -660,7 +660,7 @@ public abstract class BarrierBufferTestBase {
 		// alignment of checkpoint 2
 		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getCurrentCheckpointId());
+		assertEquals(2L, buffer.getLatestCheckpointId());
 
 		// checkpoint 2 completed
 		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
@@ -669,7 +669,7 @@ public abstract class BarrierBufferTestBase {
 
 		// checkpoint 3 skipped, alignment for 4 started
 		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(4L, buffer.getCurrentCheckpointId());
+		assertEquals(4L, buffer.getLatestCheckpointId());
 		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
@@ -790,24 +790,24 @@ public abstract class BarrierBufferTestBase {
 
 		// checkpoint 3 alignment
 		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getCurrentCheckpointId());
+		assertEquals(2L, buffer.getLatestCheckpointId());
 		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 buffered
 		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(3L, buffer.getCurrentCheckpointId());
+		assertEquals(3L, buffer.getLatestCheckpointId());
 
 		// after checkpoint 4
 		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(4L, buffer.getCurrentCheckpointId());
+		assertEquals(4L, buffer.getLatestCheckpointId());
 		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
 
 		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(5L, buffer.getCurrentCheckpointId());
+		assertEquals(5L, buffer.getLatestCheckpointId());
 		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
 	}
 
@@ -837,11 +837,11 @@ public abstract class BarrierBufferTestBase {
 		check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getCurrentCheckpointId());
+		assertEquals(1L, buffer.getLatestCheckpointId());
 
 		// alignment of second checkpoint
 		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getCurrentCheckpointId());
+		assertEquals(2L, buffer.getLatestCheckpointId());
 
 		// first end-of-partition encountered: checkpoint will not be completed
 		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
@@ -874,7 +874,7 @@ public abstract class BarrierBufferTestBase {
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(5L, buffer.getCurrentCheckpointId());
+		assertEquals(5L, buffer.getLatestCheckpointId());
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
@@ -882,7 +882,7 @@ public abstract class BarrierBufferTestBase {
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(6L, buffer.getCurrentCheckpointId());
+		assertEquals(6L, buffer.getLatestCheckpointId());
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index a8e7727..5112f63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -46,7 +46,7 @@ public class BarrierTrackerTest {
 
 	private static final int PAGE_SIZE = 512;
 
-	private BarrierTracker tracker;
+	private CheckpointedInputGate tracker;
 
 	@After
 	public void ensureEmpty() throws Exception {
@@ -365,16 +365,19 @@ public class BarrierTrackerTest {
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------
-	private static BarrierTracker createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) {
+	private static CheckpointedInputGate createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) {
 		return createBarrierTracker(numberOfChannels, sequence, null);
 	}
 
-	private static BarrierTracker createBarrierTracker(
+	private static CheckpointedInputGate createBarrierTracker(
 			int numberOfChannels,
 			BufferOrEvent[] sequence,
 			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
-		return new BarrierTracker(gate, toNotifyOnCheckpoint);
+		return new BarrierBuffer(
+			gate,
+			new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
+			new CheckpointBarrierTracker(gate.getNumberOfInputChannels(), toNotifyOnCheckpoint));
 	}
 
 	private static BufferOrEvent createBarrier(long id, int channel) {