You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:18 UTC

[flink] 09/16: [FLINK-12777][network] Rename existing classes to make them in sync with the refactor

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d502b6addffdc23a4826796c630bf7f9dbae718
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 10:41:11 2019 +0200

    [FLINK-12777][network] Rename existing classes to make them in sync with the refactor
    
    1. Rename BarrierBuffer to CheckpointedInputGate
    CheckpointedInputGate was an interface, while BarrierBuffer was
    it's implementation. This rename means that we are dropping the interface
    and keeping only the concrete class.
    
    2. Rename BarrierBuffer and BarrierTracker tests to match this rename
    and previous refactorings.
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 243 --------
 .../streaming/runtime/io/CachedBufferStorage.java  |   2 +-
 .../runtime/io/CheckpointedInputGate.java          | 228 ++++++-
 .../streaming/runtime/io/InputProcessorUtil.java   |  12 +-
 .../streaming/runtime/io/StreamInputProcessor.java |   2 +-
 .../runtime/io/StreamTaskNetworkInput.java         |  22 +-
 .../runtime/io/StreamTwoInputProcessor.java        |   2 +-
 .../io/StreamTwoInputSelectableProcessor.java      |   4 +-
 ...heckpointBarrierAlignerAlignmentLimitTest.java} |  15 +-
 ...CheckpointBarrierAlignerMassiveRandomTest.java} |   6 +-
 ....java => CheckpointBarrierAlignerTestBase.java} | 660 ++++++++++-----------
 ...Test.java => CheckpointBarrierTrackerTest.java} |  54 +-
 ...> CreditBasedCheckpointBarrierAlignerTest.java} |   8 +-
 ...a => SpillingCheckpointBarrierAlignerTest.java} |   8 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   7 +-
 15 files changed, 614 insertions(+), 659 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
deleted file mode 100644
index 8dcc005..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The barrier buffer is {@link CheckpointedInputGate} that blocks inputs with barriers until
- * all inputs have received the barrier for a given checkpoint.
- *
- * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
- * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
- * the blocks are released.
- */
-@Internal
-public class BarrierBuffer implements CheckpointedInputGate {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
-	private final CheckpointBarrierHandler barrierHandler;
-
-	/** The gate that the buffer draws its input from. */
-	private final InputGate inputGate;
-
-	private final BufferStorage bufferStorage;
-
-	/** Flag to indicate whether we have drawn all available input. */
-	private boolean endOfInputGate;
-
-	/** Indicate end of the input. Set to true after encountering {@link #endOfInputGate} and depleting
-	 * {@link #bufferStorage}. */
-	private boolean isFinished;
-
-	/**
-	 * Creates a new checkpoint stream aligner.
-	 *
-	 * <p>There is no limit to how much data may be buffered during an alignment.
-	 *
-	 * @param inputGate The input gate to draw the buffers and events from.
-	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
-	 */
-	@VisibleForTesting
-	BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
-		this (inputGate, bufferStorage, "Testing: No task associated", null);
-	}
-
-	BarrierBuffer(
-			InputGate inputGate,
-			BufferStorage bufferStorage,
-			String taskName,
-			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
-		this(
-			inputGate,
-			bufferStorage,
-			new CheckpointBarrierAligner(
-				inputGate.getNumberOfInputChannels(),
-				taskName,
-				toNotifyOnCheckpoint)
-		);
-	}
-
-	/**
-	 * Creates a new checkpoint stream aligner.
-	 *
-	 * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
-	 * When that number is exceeded, it will stop the alignment and notify the task that the
-	 * checkpoint has been cancelled.
-	 *
-	 * @param inputGate The input gate to draw the buffers and events from.
-	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
-	 * @param barrierHandler Handler that controls which channels are blocked.
-	 */
-	BarrierBuffer(
-			InputGate inputGate,
-			BufferStorage bufferStorage,
-			CheckpointBarrierHandler barrierHandler) {
-		this.inputGate = inputGate;
-		this.bufferStorage = checkNotNull(bufferStorage);
-		this.barrierHandler = barrierHandler;
-	}
-
-	@Override
-	public CompletableFuture<?> isAvailable() {
-		if (bufferStorage.isEmpty()) {
-			return inputGate.isAvailable();
-		}
-		return AVAILABLE;
-	}
-
-	@Override
-	public Optional<BufferOrEvent> pollNext() throws Exception {
-		while (true) {
-			// process buffered BufferOrEvents before grabbing new ones
-			Optional<BufferOrEvent> next;
-			if (bufferStorage.isEmpty()) {
-				next = inputGate.pollNext();
-			}
-			else {
-				// TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
-				next = bufferStorage.pollNext();
-				if (!next.isPresent()) {
-					return pollNext();
-				}
-			}
-
-			if (!next.isPresent()) {
-				return handleEmptyBuffer();
-			}
-
-			BufferOrEvent bufferOrEvent = next.get();
-			if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
-				// if the channel is blocked, we just store the BufferOrEvent
-				bufferStorage.add(bufferOrEvent);
-				if (bufferStorage.isFull()) {
-					barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
-					bufferStorage.rollOver();
-				}
-			}
-			else if (bufferOrEvent.isBuffer()) {
-				return next;
-			}
-			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
-				CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
-				if (!endOfInputGate) {
-					// process barriers only if there is a chance of the checkpoint completing
-					if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
-						bufferStorage.rollOver();
-					}
-				}
-			}
-			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
-				if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
-					bufferStorage.rollOver();
-				}
-			}
-			else {
-				if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
-					if (barrierHandler.processEndOfPartition()) {
-						bufferStorage.rollOver();
-					}
-				}
-				return next;
-			}
-		}
-	}
-
-	private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
-		if (!inputGate.isFinished()) {
-			return Optional.empty();
-		}
-
-		if (endOfInputGate) {
-			isFinished = true;
-			return Optional.empty();
-		} else {
-			// end of input stream. stream continues with the buffered data
-			endOfInputGate = true;
-			barrierHandler.releaseBlocksAndResetBarriers();
-			bufferStorage.rollOver();
-			return pollNext();
-		}
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return bufferStorage.isEmpty();
-	}
-
-	@Override
-	public boolean isFinished() {
-		return isFinished;
-	}
-
-	@Override
-	public void cleanup() throws IOException {
-		bufferStorage.close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the ID defining the current pending, or just completed, checkpoint.
-	 *
-	 * @return The ID of the pending of completed checkpoint.
-	 */
-	public long getLatestCheckpointId() {
-		return barrierHandler.getLatestCheckpointId();
-	}
-
-	@Override
-	public long getAlignmentDurationNanos() {
-		return barrierHandler.getAlignmentDurationNanos();
-	}
-
-	@Override
-	public int getNumberOfInputChannels() {
-		return inputGate.getNumberOfInputChannels();
-	}
-
-	// ------------------------------------------------------------------------
-	// Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return barrierHandler.toString();
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
index 628a69c..4927c35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
@@ -118,7 +118,7 @@ public class CachedBufferStorage extends AbstractBufferStorage {
 	 */
 	public static class CachedBufferOrEventSequence implements BufferOrEventSequence {
 
-		/** The sequence of buffers and events to be consumed by {@link BarrierBuffer}.*/
+		/** The sequence of buffers and events to be consumed by {@link CheckpointedInputGate}.*/
 		private final ArrayDeque<BufferOrEvent> queuedBuffers;
 
 		/** The total size of the cached data. */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index cdbbfbc..7604d0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,29 +18,213 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.AsyncDataInput;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The {@link CheckpointedInputGate} uses {@link CheckpointBarrierHandler} to handle incoming
- * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier} from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputGate}.
+ * {@link CheckpointBarrier} from the {@link InputGate}.
  */
 @Internal
-public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> {
+public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointedInputGate.class);
+
+	private final CheckpointBarrierHandler barrierHandler;
+
+	/** The gate that the buffer draws its input from. */
+	private final InputGate inputGate;
+
+	private final BufferStorage bufferStorage;
+
+	/** Flag to indicate whether we have drawn all available input. */
+	private boolean endOfInputGate;
+
+	/** Indicate end of the input. Set to true after encountering {@link #endOfInputGate} and depleting
+	 * {@link #bufferStorage}. */
+	private boolean isFinished;
+
 	/**
-	 * Cleans up all internally held resources.
+	 * Creates a new checkpoint stream aligner.
 	 *
-	 * @throws IOException Thrown if the cleanup of I/O resources failed.
+	 * <p>There is no limit to how much data may be buffered during an alignment.
+	 *
+	 * @param inputGate The input gate to draw the buffers and events from.
+	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
 	 */
-	void cleanup() throws IOException;
+	@VisibleForTesting
+	CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage) {
+		this (inputGate, bufferStorage, "Testing: No task associated", null);
+	}
+
+	public CheckpointedInputGate(
+			InputGate inputGate,
+			BufferStorage bufferStorage,
+			String taskName,
+			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+		this(
+			inputGate,
+			bufferStorage,
+			new CheckpointBarrierAligner(
+				inputGate.getNumberOfInputChannels(),
+				taskName,
+				toNotifyOnCheckpoint)
+		);
+	}
+
+	/**
+	 * Creates a new checkpoint stream aligner.
+	 *
+	 * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
+	 * When that number is exceeded, it will stop the alignment and notify the task that the
+	 * checkpoint has been cancelled.
+	 *
+	 * @param inputGate The input gate to draw the buffers and events from.
+	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
+	 * @param barrierHandler Handler that controls which channels are blocked.
+	 */
+	public CheckpointedInputGate(
+			InputGate inputGate,
+			BufferStorage bufferStorage,
+			CheckpointBarrierHandler barrierHandler) {
+		this.inputGate = inputGate;
+		this.bufferStorage = checkNotNull(bufferStorage);
+		this.barrierHandler = barrierHandler;
+	}
+
+	@Override
+	public CompletableFuture<?> isAvailable() {
+		if (bufferStorage.isEmpty()) {
+			return inputGate.isAvailable();
+		}
+		return AVAILABLE;
+	}
+
+	@Override
+	public Optional<BufferOrEvent> pollNext() throws Exception {
+		while (true) {
+			// process buffered BufferOrEvents before grabbing new ones
+			Optional<BufferOrEvent> next;
+			if (bufferStorage.isEmpty()) {
+				next = inputGate.pollNext();
+			}
+			else {
+				// TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
+				next = bufferStorage.pollNext();
+				if (!next.isPresent()) {
+					return pollNext();
+				}
+			}
+
+			if (!next.isPresent()) {
+				return handleEmptyBuffer();
+			}
+
+			BufferOrEvent bufferOrEvent = next.get();
+			if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
+				// if the channel is blocked, we just store the BufferOrEvent
+				bufferStorage.add(bufferOrEvent);
+				if (bufferStorage.isFull()) {
+					barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+					bufferStorage.rollOver();
+				}
+			}
+			else if (bufferOrEvent.isBuffer()) {
+				return next;
+			}
+			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
+				CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
+				if (!endOfInputGate) {
+					// process barriers only if there is a chance of the checkpoint completing
+					if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+						bufferStorage.rollOver();
+					}
+				}
+			}
+			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
+				if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
+					bufferStorage.rollOver();
+				}
+			}
+			else {
+				if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
+					if (barrierHandler.processEndOfPartition()) {
+						bufferStorage.rollOver();
+					}
+				}
+				return next;
+			}
+		}
+	}
+
+	private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
+		if (!inputGate.isFinished()) {
+			return Optional.empty();
+		}
+
+		if (endOfInputGate) {
+			isFinished = true;
+			return Optional.empty();
+		} else {
+			// end of input stream. stream continues with the buffered data
+			endOfInputGate = true;
+			barrierHandler.releaseBlocksAndResetBarriers();
+			bufferStorage.rollOver();
+			return pollNext();
+		}
+	}
 
 	/**
 	 * Checks if the barrier handler has buffered any data internally.
 	 * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
 	 */
-	boolean isEmpty();
+	public boolean isEmpty() {
+		return bufferStorage.isEmpty();
+	}
+
+	@Override
+	public boolean isFinished() {
+		return isFinished;
+	}
+
+	/**
+	 * Cleans up all internally held resources.
+	 *
+	 * @throws IOException Thrown if the cleanup of I/O resources failed.
+	 */
+	public void cleanup() throws IOException {
+		bufferStorage.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the ID defining the current pending, or just completed, checkpoint.
+	 *
+	 * @return The ID of the pending of completed checkpoint.
+	 */
+	public long getLatestCheckpointId() {
+		return barrierHandler.getLatestCheckpointId();
+	}
 
 	/**
 	 * Gets the time that the latest alignment took, in nanoseconds.
@@ -50,10 +233,23 @@ public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> {
 	 *
 	 * @return The duration in nanoseconds
 	 */
-	long getAlignmentDurationNanos();
+	public long getAlignmentDurationNanos() {
+		return barrierHandler.getAlignmentDurationNanos();
+	}
 
 	/**
 	 * @return number of underlying input channels.
 	 */
-	int getNumberOfInputChannels();
+	public int getNumberOfInputChannels() {
+		return inputGate.getNumberOfInputChannels();
+	}
+
+	// ------------------------------------------------------------------------
+	// Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return barrierHandler.toString();
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 75926b9..7eda06c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -36,7 +36,7 @@ import java.io.IOException;
 @Internal
 public class InputProcessorUtil {
 
-	public static CheckpointedInputGate createCheckpointBarrierHandler(
+	public static CheckpointedInputGate createCheckpointedInputGate(
 			StreamTask<?, ?> checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
@@ -44,7 +44,7 @@ public class InputProcessorUtil {
 			Configuration taskManagerConfig,
 			String taskName) throws IOException {
 
-		CheckpointedInputGate barrierHandler;
+		CheckpointedInputGate checkpointedInputGate;
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
 			long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
 			if (!(maxAlign == -1 || maxAlign > 0)) {
@@ -54,20 +54,20 @@ public class InputProcessorUtil {
 			}
 
 			if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
-				barrierHandler = new BarrierBuffer(
+				checkpointedInputGate = new CheckpointedInputGate(
 					inputGate,
 					new CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName),
 					taskName,
 					checkpointedTask);
 			} else {
-				barrierHandler = new BarrierBuffer(
+				checkpointedInputGate = new CheckpointedInputGate(
 					inputGate,
 					new BufferSpiller(ioManager, inputGate.getPageSize(), maxAlign, taskName),
 					taskName,
 					checkpointedTask);
 			}
 		} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-			barrierHandler = new BarrierBuffer(
+			checkpointedInputGate = new CheckpointedInputGate(
 				inputGate,
 				new EmptyBufferStorage(),
 				new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask));
@@ -75,6 +75,6 @@ public class InputProcessorUtil {
 			throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
 		}
 
-		return barrierHandler;
+		return checkpointedInputGate;
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 58b2051..d6fcad2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -104,7 +104,7 @@ public class StreamInputProcessor<IN> {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
-		CheckpointedInputGate barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
+		CheckpointedInputGate barrierHandler = InputProcessorUtil.createCheckpointedInputGate(
 			checkpointedTask,
 			checkpointMode,
 			ioManager,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index ecf88e2..8c37141 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public final class StreamTaskNetworkInput implements StreamTaskInput {
 
-	private final CheckpointedInputGate barrierHandler;
+	private final CheckpointedInputGate checkpointedInputGate;
 
 	private final DeserializationDelegate<StreamElement> deserializationDelegate;
 
@@ -63,16 +63,16 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 
 	@SuppressWarnings("unchecked")
 	public StreamTaskNetworkInput(
-			CheckpointedInputGate barrierHandler,
+			CheckpointedInputGate checkpointedInputGate,
 			TypeSerializer<?> inputSerializer,
 			IOManager ioManager,
 			int inputIndex) {
-		this.barrierHandler = barrierHandler;
+		this.checkpointedInputGate = checkpointedInputGate;
 		this.deserializationDelegate = new NonReusingDeserializationDelegate<>(
 			new StreamElementSerializer<>(inputSerializer));
 
 		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[barrierHandler.getNumberOfInputChannels()];
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
 		for (int i = 0; i < recordDeserializers.length; i++) {
 			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
 				ioManager.getSpillingDirectoriesPaths());
@@ -99,14 +99,14 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 				}
 			}
 
-			Optional<BufferOrEvent> bufferOrEvent = barrierHandler.pollNext();
+			Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
 			if (bufferOrEvent.isPresent()) {
 				processBufferOrEvent(bufferOrEvent.get());
 			} else {
-				if (barrierHandler.isFinished()) {
+				if (checkpointedInputGate.isFinished()) {
 					isFinished = true;
-					checkState(barrierHandler.isAvailable().isDone(), "Finished BarrierHandler should be available");
-					if (!barrierHandler.isEmpty()) {
+					checkState(checkpointedInputGate.isAvailable().isDone(), "Finished BarrierHandler should be available");
+					if (!checkpointedInputGate.isEmpty()) {
 						throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
 					}
 				}
@@ -124,7 +124,7 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 		else {
 			// Event received
 			final AbstractEvent event = bufferOrEvent.getEvent();
-			// TODO: with barrierHandler.isFinished() we might not need to support any events on this level.
+			// TODO: with checkpointedInputGate.isFinished() we might not need to support any events on this level.
 			if (event.getClass() != EndOfPartitionEvent.class) {
 				throw new IOException("Unexpected event: " + event);
 			}
@@ -148,7 +148,7 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 
 	@Override
 	public CompletableFuture<?> isAvailable() {
-		return barrierHandler.isAvailable();
+		return checkpointedInputGate.isAvailable();
 	}
 
 	@Override
@@ -162,6 +162,6 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 			deserializer.clear();
 		}
 
-		barrierHandler.cleanup();
+		checkpointedInputGate.cleanup();
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index aa6354d..f989e57 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -154,7 +154,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
-		this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
+		this.barrierHandler = InputProcessorUtil.createCheckpointedInputGate(
 			checkpointedTask,
 			checkpointMode,
 			ioManager,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index d5ebf29..37c17db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -121,8 +121,8 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 
 		// create a Input instance for each input
 		CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize());
-		this.input1 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
-		this.input2 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
+		this.input1 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
+		this.input2 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
 
 		this.statusWatermarkValve1 = new StatusWatermarkValve(
 			unionedInputGate1.getNumberOfInputChannels(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
similarity index 94%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 2eb3f5c..0621179 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -56,9 +57,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
- * Tests for the barrier buffer's maximum limit of buffered/spilled bytes.
+ * Tests for the {@link CheckpointBarrierAligner}'s maximum limit of buffered/spilled bytes.
  */
-public class BarrierBufferAlignmentLimitTest {
+public class CheckpointBarrierAlignerAlignmentLimitTest {
 
 	private static final int PAGE_SIZE = 512;
 
@@ -116,7 +117,7 @@ public class BarrierBufferAlignmentLimitTest {
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		BarrierBuffer buffer = new BarrierBuffer(
+		CheckpointedInputGate buffer = new CheckpointedInputGate(
 			gate,
 			new BufferSpiller(ioManager, gate.getPageSize(), 1000),
 			"Testing",
@@ -139,7 +140,7 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[5], buffer.pollNext().get());
 		validateAlignmentTime(startTs, buffer);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L),
-			argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
+			argThat(new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
 
 		// playing back buffered events
 		check(sequence[7], buffer.pollNext().get());
@@ -213,7 +214,7 @@ public class BarrierBufferAlignmentLimitTest {
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		BarrierBuffer buffer = new BarrierBuffer(
+		CheckpointedInputGate buffer = new CheckpointedInputGate(
 			gate,
 			new BufferSpiller(ioManager, gate.getPageSize(), 500),
 			"Testing",
@@ -237,7 +238,7 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[4], buffer.pollNext().get());
 		validateAlignmentTime(startTs, buffer);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L),
-			argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
+			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
 
 		// replay buffered data - in the middle, the alignment for checkpoint 4 starts
 		check(sequence[6], buffer.pollNext().get());
@@ -314,7 +315,7 @@ public class BarrierBufferAlignmentLimitTest {
 		}
 	}
 
-	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+	private static void validateAlignmentTime(long startTimestamp, CheckpointedInputGate buffer) {
 		final long elapsed = System.nanoTime() - startTimestamp;
 		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
similarity index 95%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 7fc8a5d..7da0aa3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.fail;
  * and randomly generate checkpoint barriers. The two streams are very
  * unaligned, putting heavy work on the BarrierBuffer.
  */
-public class BarrierBufferMassiveRandomTest {
+public class CheckpointBarrierAlignerMassiveRandomTest {
 
 	private static final int PAGE_SIZE = 1024;
 
@@ -62,10 +62,10 @@ public class BarrierBufferMassiveRandomTest {
 					new BufferPool[] { pool1, pool2 },
 					new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
 
-			BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, new BufferSpiller(ioMan, myIG.getPageSize()));
+			CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(myIG, new BufferSpiller(ioMan, myIG.getPageSize()));
 
 			for (int i = 0; i < 2000000; i++) {
-				BufferOrEvent boe = barrierBuffer.pollNext().get();
+				BufferOrEvent boe = checkpointedInputGate.pollNext().get();
 				if (boe.isBuffer()) {
 					boe.getBuffer().recycleBuffer();
 				}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
similarity index 63%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 13c4aad..687c95d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -60,9 +60,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferStorage} implements.
+ * Tests for the behavior of the {@link CheckpointBarrierAligner} with different {@link BufferStorage} implements.
  */
-public abstract class BarrierBufferTestBase {
+public abstract class CheckpointBarrierAlignerTestBase {
 
 	protected static final int PAGE_SIZE = 512;
 
@@ -70,9 +70,9 @@ public abstract class BarrierBufferTestBase {
 
 	private static int sizeCounter = 1;
 
-	BarrierBuffer buffer;
+	CheckpointedInputGate inputGate;
 
-	protected BarrierBuffer createBarrierBuffer(
+	protected CheckpointedInputGate createBarrierBuffer(
 		int numberOfChannels,
 		BufferOrEvent[] sequence,
 		@Nullable AbstractInvokable toNotify) throws IOException {
@@ -80,25 +80,25 @@ public abstract class BarrierBufferTestBase {
 		return createBarrierBuffer(gate, toNotify);
 	}
 
-	protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
+	protected CheckpointedInputGate createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
 		return createBarrierBuffer(numberOfChannels, sequence, null);
 	}
 
-	protected BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
+	protected CheckpointedInputGate createBarrierBuffer(InputGate gate) throws IOException {
 		return createBarrierBuffer(gate, null);
 	}
 
-	abstract BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException;
+	abstract CheckpointedInputGate createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException;
 
 	abstract void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent... sequence);
 
 	@After
 	public void ensureEmpty() throws Exception {
-		assertFalse(buffer.pollNext().isPresent());
-		assertTrue(buffer.isFinished());
-		assertTrue(buffer.isEmpty());
+		assertFalse(inputGate.pollNext().isPresent());
+		assertTrue(inputGate.isFinished());
+		assertTrue(inputGate.isEmpty());
 
-		buffer.cleanup();
+		inputGate.cleanup();
 	}
 
 	// ------------------------------------------------------------------------
@@ -115,13 +115,13 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0), createBuffer(0),
 			createBuffer(0), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(1, sequence);
+		inputGate = createBarrierBuffer(1, sequence);
 
 		for (BufferOrEvent boe : sequence) {
-			assertEquals(boe, buffer.pollNext().get());
+			assertEquals(boe, inputGate.pollNext().get());
 		}
 
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 	}
 
 	/**
@@ -136,13 +136,13 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(3), createBuffer(1), createEndOfPartition(3),
 			createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
 		};
-		buffer = createBarrierBuffer(4, sequence);
+		inputGate = createBarrierBuffer(4, sequence);
 
 		for (BufferOrEvent boe : sequence) {
-			assertEquals(boe, buffer.pollNext().get());
+			assertEquals(boe, inputGate.pollNext().get());
 		}
 
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 	}
 
 	/**
@@ -161,13 +161,13 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0), createEndOfPartition(0)
 		};
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer = createBarrierBuffer(1, sequence, handler);
+		inputGate = createBarrierBuffer(1, sequence, handler);
 
 		handler.setNextExpectedCheckpointId(1L);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-				assertEquals(boe, buffer.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -211,81 +211,81 @@ public abstract class BarrierBufferTestBase {
 			createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
 		};
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer = createBarrierBuffer(3, sequence, handler);
+		inputGate = createBarrierBuffer(3, sequence, handler);
 
 		handler.setNextExpectedCheckpointId(1L);
 
 		// pre checkpoint 1
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(1L, handler.getNextExpectedCheckpointId());
 
 		long startTs = System.nanoTime();
 
 		// blocking while aligning for checkpoint 1
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(1L, handler.getNextExpectedCheckpointId());
 
 		// checkpoint 1 done, returning buffered data
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(2L, handler.getNextExpectedCheckpointId());
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[5], sequence[6]);
 
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// pre checkpoint 2
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(2L, handler.getNextExpectedCheckpointId());
 
 		// checkpoint 2 barriers come together
 		startTs = System.nanoTime();
-		check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(3L, handler.getNextExpectedCheckpointId());
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment());
 
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 starts, data buffered
-		check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
 		validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[20], sequence[21]);
 		assertEquals(4L, handler.getNextExpectedCheckpointId());
-		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 4 happens without extra data
 
 		// pre checkpoint 5
-		check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
 
 		validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment());
 		assertEquals(5L, handler.getNextExpectedCheckpointId());
 
-		check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[29], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[29], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 5 aligning
-		check(sequence[31], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[31], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// buffered data from checkpoint 5 alignment
-		check(sequence[34], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[36], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[38], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[39], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[34], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[36], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[38], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[39], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// remaining data
-		check(sequence[41], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[42], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[43], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[44], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[41], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[42], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[43], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[44], inputGate.pollNext().get(), PAGE_SIZE);
 
 		validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(),
 			sequence[34], sequence[36], sequence[38], sequence[39]);
@@ -304,36 +304,36 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
 		};
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer = createBarrierBuffer(3, sequence, handler);
+		inputGate = createBarrierBuffer(3, sequence, handler);
 
 		handler.setNextExpectedCheckpointId(1L);
 
 		// pre-checkpoint 1
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(1L, handler.getNextExpectedCheckpointId());
 
 		// pre-checkpoint 2
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(2L, handler.getNextExpectedCheckpointId());
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 2 alignment
 		long startTs = System.nanoTime();
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 
 		// end of stream: remaining buffered contents
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	/**
@@ -379,69 +379,69 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0), createEndOfPartition(0)
 		};
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer = createBarrierBuffer(3, sequence, handler);
+		inputGate = createBarrierBuffer(3, sequence, handler);
 
 		handler.setNextExpectedCheckpointId(1L);
 
 		// around checkpoint 1
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
 
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(2L, handler.getNextExpectedCheckpointId());
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// alignment of checkpoint 2 - buffering also some barriers for
 		// checkpoints 3 and 4
 		long startTs = System.nanoTime();
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[23], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[23], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 2 completed
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
-		check(sequence[25], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[30], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
+		check(sequence[25], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 completed (emit buffered)
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// past checkpoint 3
-		check(sequence[36], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[38], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[36], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[38], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 4 completed (emit buffered)
-		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[31], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[39], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[31], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[39], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// past checkpoint 4, alignment for checkpoint 5
-		check(sequence[42], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[45], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[46], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[42], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[45], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[46], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// abort checkpoint 5 (end of partition)
-		check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// start checkpoint 6 alignment
-		check(sequence[47], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[48], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[47], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[48], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// end of input, emit remainder
-		check(sequence[43], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[44], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[43], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[44], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	/**
@@ -471,58 +471,58 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0), createEndOfPartition(0)
 		};
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer = createBarrierBuffer(3, sequence, toNotify);
+		inputGate = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
 		// initial data
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// align checkpoint 1
 		startTs = System.nanoTime();
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getLatestCheckpointId());
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(1L, inputGate.getLatestCheckpointId());
 
 		// checkpoint done - replay buffered
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// alignment of checkpoint 2
 		startTs = System.nanoTime();
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 2 aborted, checkpoint 3 started
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(3L, buffer.getLatestCheckpointId());
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(3L, inputGate.getLatestCheckpointId());
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		verify(toNotify).abortCheckpointOnBarrier(eq(2L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 alignment in progress
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 aborted (end of partition)
-		check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify).abortCheckpointOnBarrier(eq(3L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)));
 
 		// replay buffered data from checkpoint 3
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// all the remaining messages
-		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[23], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[23], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	/**
@@ -556,50 +556,50 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0), createEndOfPartition(0)
 		};
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer = createBarrierBuffer(3, sequence, handler);
+		inputGate = createBarrierBuffer(3, sequence, handler);
 
 		handler.setNextExpectedCheckpointId(1L);
 
 		// checkpoint 1
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getLatestCheckpointId());
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(1L, inputGate.getLatestCheckpointId());
 
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// alignment of checkpoint 2
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getLatestCheckpointId());
-		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(2L, inputGate.getLatestCheckpointId());
+		check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
 
 		long startTs = System.nanoTime();
 
 		// checkpoint 2 aborted, checkpoint 4 started. replay buffered
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(4L, buffer.getLatestCheckpointId());
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(4L, inputGate.getLatestCheckpointId());
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// align checkpoint 4 remainder
-		check(sequence[25], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[25], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
 
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 
 		// checkpoint 4 aborted (due to end of partition)
-		check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[29], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[30], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[29], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	/**
@@ -644,48 +644,48 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(2), createEndOfPartition(2),
 			createBuffer(0), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
+		inputGate = createBarrierBuffer(3, sequence);
 
 		// checkpoint 1
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getLatestCheckpointId());
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(1L, inputGate.getLatestCheckpointId());
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// alignment of checkpoint 2
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getLatestCheckpointId());
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(2L, inputGate.getLatestCheckpointId());
 
 		// checkpoint 2 completed
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 skipped, alignment for 4 started
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(4L, buffer.getLatestCheckpointId());
-		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[30], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(4L, inputGate.getLatestCheckpointId());
+		check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 4 completed
-		check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[29], buffer.pollNext().get(), PAGE_SIZE);
-
-		check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[34], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[35], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[36], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[29], inputGate.pollNext().get(), PAGE_SIZE);
+
+		check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[34], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[35], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[36], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	@Test
@@ -698,11 +698,11 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0)
 		};
 		AbstractInvokable validator = new CheckpointSequenceValidator(-3);
-		buffer = createBarrierBuffer(2, sequence, validator);
+		inputGate = createBarrierBuffer(2, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
-				assertEquals(boe, buffer.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -720,34 +720,34 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
 		};
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer = createBarrierBuffer(3, sequence, handler);
+		inputGate = createBarrierBuffer(3, sequence, handler);
 
 		handler.setNextExpectedCheckpointId(1L);
 
 		// pre-checkpoint 1
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(1L, handler.getNextExpectedCheckpointId());
 
 		// pre-checkpoint 2
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 		assertEquals(2L, handler.getNextExpectedCheckpointId());
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 2 alignment
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// drain buffer
-		buffer.pollNext().get();
-		buffer.pollNext().get();
-		buffer.pollNext().get();
-		buffer.pollNext().get();
-		buffer.pollNext().get();
+		inputGate.pollNext().get();
+		inputGate.pollNext().get();
+		inputGate.pollNext().get();
+		inputGate.pollNext().get();
+		inputGate.pollNext().get();
 	}
 
 	@Test
@@ -779,36 +779,36 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(3),
 			createEndOfPartition(3)
 		};
-		buffer = createBarrierBuffer(4, sequence);
+		inputGate = createBarrierBuffer(4, sequence);
 
 		// pre checkpoint 2
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 alignment
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getLatestCheckpointId());
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(2L, inputGate.getLatestCheckpointId());
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint 3 buffered
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(3L, buffer.getLatestCheckpointId());
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(3L, inputGate.getLatestCheckpointId());
 
 		// after checkpoint 4
-		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(4L, buffer.getLatestCheckpointId());
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(5L, buffer.getLatestCheckpointId());
-		check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(4L, inputGate.getLatestCheckpointId());
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(5L, inputGate.getLatestCheckpointId());
+		check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	@Test
@@ -831,25 +831,25 @@ public abstract class BarrierBufferTestBase {
 			// final end of stream
 			createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
+		inputGate = createBarrierBuffer(3, sequence);
 
 		// data after first checkpoint
-		check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(1L, buffer.getLatestCheckpointId());
+		check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(1L, inputGate.getLatestCheckpointId());
 
 		// alignment of second checkpoint
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(2L, buffer.getLatestCheckpointId());
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(2L, inputGate.getLatestCheckpointId());
 
 		// first end-of-partition encountered: checkpoint will not be completed
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
 	}
 
 	@Test
@@ -866,26 +866,26 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0)
 		};
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer = createBarrierBuffer(1, sequence, toNotify);
+		inputGate = createBarrierBuffer(1, sequence, toNotify);
 
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(5L, buffer.getLatestCheckpointId());
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(5L, inputGate.getLatestCheckpointId());
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-		assertEquals(6L, buffer.getLatestCheckpointId());
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+		assertEquals(6L, inputGate.getLatestCheckpointId());
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 	}
 
 	@Test
@@ -925,62 +925,62 @@ public abstract class BarrierBufferTestBase {
 			/* 37 */ createBuffer(0)
 		};
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer = createBarrierBuffer(3, sequence, toNotify);
+		inputGate = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
 		// successful first checkpoint, with some aligned buffers
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 		startTs = System.nanoTime();
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// canceled checkpoint on last barrier
 		startTs = System.nanoTime();
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// one more successful checkpoint
-		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
 		startTs = System.nanoTime();
-		check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
-		check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
+		check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// this checkpoint gets immediately canceled
-		check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
 		// some buffers
-		check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// a simple successful checkpoint
 		startTs = System.nanoTime();
-		check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
-		check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
+		check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
 
-		check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 	}
 
 	@Test
@@ -1011,41 +1011,41 @@ public abstract class BarrierBufferTestBase {
 			/* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
 		};
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer = createBarrierBuffer(3, sequence, toNotify);
+		inputGate = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// starting first checkpoint
 		startTs = System.nanoTime();
-		check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// finished first checkpoint
-		check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// re-read the queued cancellation barriers
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
 
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// no further alignment should have happened
-		assertEquals(0L, buffer.getAlignmentDurationNanos());
+		assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
 		// no further checkpoint (abort) notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
@@ -1093,44 +1093,44 @@ public abstract class BarrierBufferTestBase {
 			/* 18 */ createBuffer(0), createBuffer(1), createBuffer(2)
 		};
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer = createBarrierBuffer(3, sequence, toNotify);
+		inputGate = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// starting first checkpoint
 		startTs = System.nanoTime();
-		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// cancelled by cancellation barrier
-		check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		verify(toNotify).abortCheckpointOnBarrier(eq(1L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
 		// the next checkpoint alignment starts now
 		startTs = System.nanoTime();
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint done
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		// queued data
-		check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// trailing data
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// check overall notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
@@ -1168,40 +1168,40 @@ public abstract class BarrierBufferTestBase {
 			/* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
 		};
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer = createBarrierBuffer(3, sequence, toNotify);
+		inputGate = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
 		// validate the sequence
 
-		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// beginning of first checkpoint
-		check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// future barrier aborts checkpoint
 		startTs = System.nanoTime();
-		check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L),
 			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
-		check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// alignment of next checkpoint
-		check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// checkpoint finished
-		check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-		validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
+		check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+		validateAlignmentTime(startTs, inputGate.getAlignmentDurationNanos());
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
-		check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// remaining data
-		check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
-		check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+		check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
+		check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
 		// check overall notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
similarity index 89%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
index 5112f63..2218680 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
@@ -42,26 +42,26 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for the behavior of the barrier tracker.
  */
-public class BarrierTrackerTest {
+public class CheckpointBarrierTrackerTest {
 
 	private static final int PAGE_SIZE = 512;
 
-	private CheckpointedInputGate tracker;
+	private CheckpointedInputGate inputGate;
 
 	@After
 	public void ensureEmpty() throws Exception {
-		assertFalse(tracker.pollNext().isPresent());
-		assertTrue(tracker.isFinished());
-		assertTrue(tracker.isEmpty());
+		assertFalse(inputGate.pollNext().isPresent());
+		assertTrue(inputGate.isFinished());
+		assertTrue(inputGate.isEmpty());
 	}
 
 	@Test
 	public void testSingleChannelNoBarriers() throws Exception {
 		BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), createBuffer(0) };
-		tracker = createBarrierTracker(1, sequence);
+		inputGate = createBarrierTracker(1, sequence);
 
 		for (BufferOrEvent boe : sequence) {
-			assertEquals(boe, tracker.pollNext().get());
+			assertEquals(boe, inputGate.pollNext().get());
 		}
 	}
 
@@ -71,10 +71,10 @@ public class BarrierTrackerTest {
 				createBuffer(1), createBuffer(0), createBuffer(3),
 				createBuffer(1), createBuffer(1), createBuffer(2)
 		};
-		tracker = createBarrierTracker(4, sequence);
+		inputGate = createBarrierTracker(4, sequence);
 
 		for (BufferOrEvent boe : sequence) {
-			assertEquals(boe, tracker.pollNext().get());
+			assertEquals(boe, inputGate.pollNext().get());
 		}
 	}
 
@@ -91,11 +91,11 @@ public class BarrierTrackerTest {
 		};
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
-		tracker = createBarrierTracker(1, sequence, validator);
+		inputGate = createBarrierTracker(1, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -113,11 +113,11 @@ public class BarrierTrackerTest {
 		};
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
-		tracker = createBarrierTracker(1, sequence, validator);
+		inputGate = createBarrierTracker(1, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -144,11 +144,11 @@ public class BarrierTrackerTest {
 		};
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(1, 2, 3, 4);
-		tracker = createBarrierTracker(3, sequence, validator);
+		inputGate = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -179,11 +179,11 @@ public class BarrierTrackerTest {
 		};
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(1, 2, 4);
-		tracker = createBarrierTracker(3, sequence, validator);
+		inputGate = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -253,11 +253,11 @@ public class BarrierTrackerTest {
 		};
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10);
-		tracker = createBarrierTracker(3, sequence, validator);
+		inputGate = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -278,13 +278,13 @@ public class BarrierTrackerTest {
 		// negative values mean an expected cancellation call!
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(1, 2, -4, 5, -6);
-		tracker = createBarrierTracker(1, sequence, validator);
+		inputGate = createBarrierTracker(1, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer()) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
-			assertTrue(tracker.isEmpty());
+			assertTrue(inputGate.isEmpty());
 		}
 	}
 
@@ -327,11 +327,11 @@ public class BarrierTrackerTest {
 		// negative values mean an expected cancellation call!
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
-		tracker = createBarrierTracker(3, sequence, validator);
+		inputGate = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer()) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -353,11 +353,11 @@ public class BarrierTrackerTest {
 		};
 		CheckpointSequenceValidator validator =
 			new CheckpointSequenceValidator(-1, -2);
-		tracker = createBarrierTracker(3, sequence, validator);
+		inputGate = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
-				assertEquals(boe, tracker.pollNext().get());
+				assertEquals(boe, inputGate.pollNext().get());
 			}
 		}
 	}
@@ -374,7 +374,7 @@ public class BarrierTrackerTest {
 			BufferOrEvent[] sequence,
 			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
-		return new BarrierBuffer(
+		return new CheckpointedInputGate(
 			gate,
 			new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
 			new CheckpointBarrierTracker(gate.getNumberOfInputChannels(), toNotifyOnCheckpoint));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedCheckpointBarrierAlignerTest.java
similarity index 79%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedCheckpointBarrierAlignerTest.java
index 3db884d..5b942a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedCheckpointBarrierAlignerTest.java
@@ -27,13 +27,13 @@ import javax.annotation.Nullable;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferStorage}.
+ * Tests for the behaviors of the {@link CheckpointedInputGate} with {@link CachedBufferStorage}.
  */
-public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
+public class CreditBasedCheckpointBarrierAlignerTest extends CheckpointBarrierAlignerTestBase {
 
 	@Override
-	BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) {
-		return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE), "Testing", toNotify);
+	CheckpointedInputGate createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) {
+		return new CheckpointedInputGate(gate, new CachedBufferStorage(PAGE_SIZE), "Testing", toNotify);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
similarity index 85%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
index f9541a9..c892073 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
@@ -36,9 +36,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer} with {@link BufferSpiller}.
+ * Tests for the behavior of the {@link CheckpointedInputGate} with {@link BufferSpiller}.
  */
-public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
+public class SpillingCheckpointBarrierAlignerTest extends CheckpointBarrierAlignerTestBase {
 
 	private static IOManager ioManager;
 
@@ -67,8 +67,8 @@ public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
 	}
 
 	@Override
-	BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException {
-		return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE), "Testing", toNotify);
+	CheckpointedInputGate createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException {
+		return new CheckpointedInputGate(gate, new BufferSpiller(ioManager, PAGE_SIZE), "Testing", toNotify);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 56c3889..456aea5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -31,7 +31,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.runtime.io.BarrierBufferTestBase;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
 
 import org.junit.Test;
 
@@ -110,7 +111,7 @@ public class StreamTaskCancellationBarrierTest {
 
 		// the decline call should go to the coordinator
 		verify(environment, times(1)).declineCheckpoint(eq(2L),
-			argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
+			argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
 		// a cancellation barrier should be downstream
 		Object result = testHarness.getOutput().poll();
@@ -155,7 +156,7 @@ public class StreamTaskCancellationBarrierTest {
 
 		// the decline call should go to the coordinator
 		verify(environment, times(1)).declineCheckpoint(eq(2L),
-			argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
+			argThat(new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
 		// a cancellation barrier should be downstream
 		Object result = testHarness.getOutput().poll();