You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:12 UTC

[flink] 03/16: [hotfix][network] Move queuedBuffered and currentBuffered fields to BufferStorage

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6093ebbcb008914e8dec6e75645a0e628428568
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 13 15:29:15 2019 +0200

    [hotfix][network] Move queuedBuffered and currentBuffered fields to BufferStorage
    
    This makes BufferStorage contract more complete. Now it takes care of the whole process
    of storing and returning the data with simpler interface (single #rollOver method
    vs two different as it was before).
---
 .../runtime/io/AbstractBufferStorage.java          | 175 ++++++++++++++++++
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 130 +++-----------
 .../flink/streaming/runtime/io/BufferSpiller.java  |  24 ++-
 .../flink/streaming/runtime/io/BufferStorage.java  |  49 +++--
 .../streaming/runtime/io/CachedBufferStorage.java  |  22 ++-
 .../streaming/runtime/io/InputProcessorUtil.java   |   6 +-
 .../io/BarrierBufferAlignmentLimitTest.java        |   6 +-
 .../streaming/runtime/io/BufferSpillerTest.java    |   2 +-
 .../runtime/io/BufferStorageTestBase.java          | 197 ++++++++-------------
 .../runtime/io/CreditBasedBarrierBufferTest.java   |   2 +-
 .../runtime/io/SpillingBarrierBufferTest.java      |   2 +-
 11 files changed, 344 insertions(+), 271 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
new file mode 100644
index 0000000..f7e4dd7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A default abstract based class for {@link BufferStorage} implementations.
+ */
+@Internal
+public abstract class AbstractBufferStorage implements BufferStorage {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(AbstractBufferStorage.class);
+
+	/**
+	 * The pending blocked buffer/event sequences. Must be consumed before requesting further data
+	 * from the input gate.
+	 */
+	protected final ArrayDeque<BufferOrEventSequence> queuedBuffered = new ArrayDeque<>();
+
+	protected final long maxBufferedBytes;
+
+	protected final String taskName;
+
+	/**
+	 * The sequence of buffers/events that has been unblocked and must now be consumed before
+	 * requesting further data from the input gate.
+	 */
+	protected BufferOrEventSequence currentBuffered;
+
+	/** The number of bytes in the queued spilled sequences. */
+	protected long rolledBytes;
+
+	protected AbstractBufferStorage(long maxBufferedBytes, String taskName) {
+		checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
+		this.maxBufferedBytes = maxBufferedBytes;
+		this.taskName = taskName;
+	}
+
+	@Override
+	public boolean isFull() {
+		return maxBufferedBytes > 0 && (getRolledBytes() + getPendingBytes()) > maxBufferedBytes;
+	}
+
+	@Override
+	public void rollOver() throws IOException {
+		if (currentBuffered == null) {
+			// common case: no more buffered data
+			currentBuffered = rollOverReusingResources();
+			if (currentBuffered != null) {
+				currentBuffered.open();
+			}
+		}
+		else {
+			// uncommon case: buffered data pending
+			// push back the pending data, if we have any
+			LOG.debug("{}: Checkpoint skipped via buffered data:" +
+				"Pushing back current alignment buffers and feeding back new alignment data first.", taskName);
+
+			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
+			BufferOrEventSequence bufferedNow = rollOverWithoutReusingResources();
+			if (bufferedNow != null) {
+				bufferedNow.open();
+				queuedBuffered.addFirst(currentBuffered);
+				rolledBytes += currentBuffered.size();
+				currentBuffered = bufferedNow;
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{}: Size of buffered data: {} bytes",
+				taskName,
+				currentBuffered == null ? 0L : currentBuffered.size());
+		}
+	}
+
+	/**
+	 * Starts a new sequence of buffers and event without reusing the same resources and
+	 * returns the current sequence of buffers for reading.
+	 *
+	 * @return The readable sequence of buffers and events, or 'null', if nothing was added.
+	 */
+	protected abstract BufferOrEventSequence rollOverWithoutReusingResources() throws IOException;
+
+	/**
+	 * Starts a new sequence of buffers and event reusing the same resources and
+	 * returns the current sequence of buffers for reading.
+	 *
+	 * @return The readable sequence of buffers and events, or 'null', if nothing was added.
+	 */
+	protected abstract BufferOrEventSequence rollOverReusingResources() throws IOException;
+
+	@Override
+	public void close() throws IOException {
+		if (currentBuffered != null) {
+			currentBuffered.cleanup();
+		}
+		for (BufferOrEventSequence seq : queuedBuffered) {
+			seq.cleanup();
+		}
+		queuedBuffered.clear();
+		rolledBytes = 0L;
+	}
+
+	@Override
+	public long getRolledBytes() {
+		return rolledBytes;
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return currentBuffered == null;
+	}
+
+	@Override
+	public Optional<BufferOrEvent> pollNext() throws IOException {
+		if (currentBuffered == null) {
+			return Optional.empty();
+		}
+		// TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
+		Optional<BufferOrEvent> next = Optional.ofNullable(currentBuffered.getNext());
+		if (!next.isPresent()) {
+			completeBufferedSequence();
+		}
+		return next;
+	}
+
+	protected void completeBufferedSequence() throws IOException {
+		LOG.debug("{}: Finished feeding back buffered data.", taskName);
+
+		currentBuffered.cleanup();
+		currentBuffered = queuedBuffered.pollFirst();
+		if (currentBuffered != null) {
+			currentBuffered.open();
+			rolledBytes -= currentBuffered.size();
+		}
+	}
+
+	@Override
+	public long currentBufferedSize() {
+		return currentBuffered != null ? currentBuffered.size() : 0L;
+	}
+
+	@Override
+	public long getMaxBufferedBytes() {
+		return maxBufferedBytes;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index b2e6ea1..23717f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -36,11 +36,9 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -68,26 +66,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	/** To utility to write blocked data to a file channel. */
 	private final BufferStorage bufferStorage;
 
-	/**
-	 * The pending blocked buffer/event sequences. Must be consumed before requesting further data
-	 * from the input gate.
-	 */
-	private final ArrayDeque<BufferOrEventSequence> queuedBuffered;
-
-	/**
-	 * The maximum number of bytes that may be buffered before an alignment is broken. -1 means
-	 * unlimited.
-	 */
-	private final long maxBufferedBytes;
-
 	private final String taskName;
 
-	/**
-	 * The sequence of buffers/events that has been unblocked and must now be consumed before
-	 * requesting further data from the input gate.
-	 */
-	private BufferOrEventSequence currentBuffered;
-
 	@Nullable
 	private final AbstractInvokable toNotifyOnCheckpoint;
 
@@ -103,9 +83,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	/** The number of already closed channels. */
 	private int numClosedChannels;
 
-	/** The number of bytes in the queued spilled sequences. */
-	private long numQueuedBytes;
-
 	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started. */
 	private long startOfAlignmentTimestamp;
 
@@ -116,7 +93,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private boolean endOfStream;
 
 	/** Indicate end of the input. Set to true after encountering {@link #endOfStream} and depleting
-	 * {@link #currentBuffered}. */
+	 * {@link #bufferStorage}. */
 	private boolean isFinished;
 
 	/**
@@ -129,7 +106,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 */
 	@VisibleForTesting
 	BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
-		this (inputGate, bufferStorage, -1, "Testing: No task associated", null);
+		this (inputGate, bufferStorage, "Testing: No task associated", null);
 	}
 
 	/**
@@ -141,25 +118,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 *
 	 * @param inputGate The input gate to draw the buffers and events from.
 	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
-	 * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
 	 * @param taskName The task name for logging.
 	 * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
 	 */
 	BarrierBuffer(
-			InputGate inputGate,
-			BufferStorage bufferStorage,
-			long maxBufferedBytes,
-			String taskName,
-			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
-		checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+		InputGate inputGate,
+		BufferStorage bufferStorage,
+		String taskName,
+		@Nullable AbstractInvokable toNotifyOnCheckpoint) {
 
 		this.inputGate = inputGate;
-		this.maxBufferedBytes = maxBufferedBytes;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
 
 		this.bufferStorage = checkNotNull(bufferStorage);
-		this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
 
 		this.taskName = taskName;
 		this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
@@ -167,7 +139,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	@Override
 	public CompletableFuture<?> isAvailable() {
-		if (currentBuffered == null) {
+		if (bufferStorage.isEmpty()) {
 			return inputGate.isAvailable();
 		}
 		return AVAILABLE;
@@ -182,14 +154,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
 			Optional<BufferOrEvent> next;
-			if (currentBuffered == null) {
+			if (bufferStorage.isEmpty()) {
 				next = inputGate.pollNext();
 			}
 			else {
 				// TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
-				next = Optional.ofNullable(currentBuffered.getNext());
+				next = bufferStorage.pollNext();
 				if (!next.isPresent()) {
-					completeBufferedSequence();
 					return pollNext();
 				}
 			}
@@ -202,7 +173,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (isBlocked(bufferOrEvent.getChannelIndex())) {
 				// if the channel is blocked, we just store the BufferOrEvent
 				bufferStorage.add(bufferOrEvent);
-				checkSizeLimit();
+				if (bufferStorage.isFull()) {
+					sizeLimitExceeded();
+				}
 			}
 			else if (bufferOrEvent.isBuffer()) {
 				return next;
@@ -241,17 +214,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void completeBufferedSequence() throws IOException {
-		LOG.debug("{}: Finished feeding back buffered data.", taskName);
-
-		currentBuffered.cleanup();
-		currentBuffered = queuedBuffered.pollFirst();
-		if (currentBuffered != null) {
-			currentBuffered.open();
-			numQueuedBytes -= currentBuffered.size();
-		}
-	}
-
 	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
@@ -420,10 +382,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			CheckpointMetaData checkpointMetaData =
 					new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
 
-			long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;
-
 			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
-					.setBytesBufferedInAlignment(bytesBuffered)
+					.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
 					.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
 			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
@@ -444,25 +404,24 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void checkSizeLimit() throws Exception {
-		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferStorage.getBytesBlocked()) > maxBufferedBytes) {
-			// exceeded our limit - abort this checkpoint
-			LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
-				taskName,
-				currentCheckpointId,
-				maxBufferedBytes);
+	private void sizeLimitExceeded() throws Exception {
+		long maxBufferedBytes = bufferStorage.getMaxBufferedBytes();
+		// exceeded our limit - abort this checkpoint
+		LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
+			taskName,
+			currentCheckpointId,
+			maxBufferedBytes);
 
-			releaseBlocksAndResetBarriers();
-			notifyAbort(currentCheckpointId,
-				new CheckpointException(
-					"Max buffered bytes: " + maxBufferedBytes,
-					CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
-		}
+		releaseBlocksAndResetBarriers();
+		notifyAbort(currentCheckpointId,
+			new CheckpointException(
+				"Max buffered bytes: " + maxBufferedBytes,
+				CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
 	}
 
 	@Override
 	public boolean isEmpty() {
-		return currentBuffered == null;
+		return bufferStorage.isEmpty();
 	}
 
 	@Override
@@ -473,14 +432,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	@Override
 	public void cleanup() throws IOException {
 		bufferStorage.close();
-		if (currentBuffered != null) {
-			currentBuffered.cleanup();
-		}
-		for (BufferOrEventSequence seq : queuedBuffered) {
-			seq.cleanup();
-		}
-		queuedBuffered.clear();
-		numQueuedBytes = 0L;
 	}
 
 	private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
@@ -535,34 +486,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			blockedChannels[i] = false;
 		}
 
-		if (currentBuffered == null) {
-			// common case: no more buffered data
-			currentBuffered = bufferStorage.rollOverReusingResources();
-			if (currentBuffered != null) {
-				currentBuffered.open();
-			}
-		}
-		else {
-			// uncommon case: buffered data pending
-			// push back the pending data, if we have any
-			LOG.debug("{}: Checkpoint skipped via buffered data:" +
-					"Pushing back current alignment buffers and feeding back new alignment data first.", taskName);
-
-			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
-			BufferOrEventSequence bufferedNow = bufferStorage.rollOverWithoutReusingResources();
-			if (bufferedNow != null) {
-				bufferedNow.open();
-				queuedBuffered.addFirst(currentBuffered);
-				numQueuedBytes += currentBuffered.size();
-				currentBuffered = bufferedNow;
-			}
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{}: Size of buffered data: {} bytes",
-				taskName,
-				currentBuffered == null ? 0L : currentBuffered.size());
-		}
+		bufferStorage.rollOver();
 
 		// the next barrier that comes must assume it is the first
 		numBarriersReceived = 0;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 59877a0..4fbfaee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 @Internal
 @Deprecated
-public class BufferSpiller implements BufferStorage {
+public class BufferSpiller extends AbstractBufferStorage {
 
 	/** Size of header in bytes (see add method). */
 	static final int HEADER_SIZE = 9;
@@ -91,14 +91,25 @@ public class BufferSpiller implements BufferStorage {
 	/** The number of bytes written since the last roll over. */
 	private long bytesWritten;
 
+	public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
+		this(ioManager, pageSize, -1);
+	}
+
+	public BufferSpiller(IOManager ioManager, int pageSize, long maxBufferedBytes) throws IOException {
+		this(ioManager, pageSize, maxBufferedBytes, "Unknown");
+	}
+
 	/**
 	 * Creates a new {@link BufferSpiller}, spilling to one of the I/O manager's temp directories.
 	 *
 	 * @param ioManager The I/O manager for access to the temp directories.
 	 * @param pageSize The page size used to re-create spilled buffers.
+	 * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
+	 * @param taskName The task name for logging.
 	 * @throws IOException Thrown if the temp files for spilling cannot be initialized.
 	 */
-	public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
+	public BufferSpiller(IOManager ioManager, int pageSize, long maxBufferedBytes, String taskName) throws IOException {
+		super(maxBufferedBytes, taskName);
 		this.pageSize = pageSize;
 
 		this.readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
@@ -118,12 +129,6 @@ public class BufferSpiller implements BufferStorage {
 		createSpillingChannel();
 	}
 
-	/**
-	 * Adds a buffer or event to the sequence of spilled buffers and events.
-	 *
-	 * @param boe The buffer or event to add and spill.
-	 * @throws IOException Thrown, if the buffer of event could not be spilled.
-	 */
 	@Override
 	public void add(BufferOrEvent boe) throws IOException {
 		try {
@@ -222,6 +227,7 @@ public class BufferSpiller implements BufferStorage {
 		if (!currentSpillFile.delete()) {
 			throw new IOException("Cannot delete spill file");
 		}
+		super.close();
 	}
 
 	/**
@@ -230,7 +236,7 @@ public class BufferSpiller implements BufferStorage {
 	 * @return the number of bytes written in the current spill file
 	 */
 	@Override
-	public long getBytesBlocked() {
+	public long getPendingBytes() {
 		return bytesWritten;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
index 7d4dff0..8e6194d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
@@ -22,14 +22,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * The {@link BufferStorage} takes the buffers and events from a data stream and adds them in a sequence.
- * After a number of elements have been added, the {@link BufferStorage} can "roll over":
- * It presents the added elements as a readable sequence, and creates a new sequence.
+ * After a number of elements have been added, the {@link BufferStorage} can {@link #rollOver() "roll over"}.
+ * After rolling over, previously stored buffers are available for reading via {@link #pollNext()}.
  */
 @Internal
-public interface BufferStorage {
+public interface BufferStorage extends AutoCloseable {
 
 	/**
 	 * Adds a buffer or event to the {@link BufferStorage}.
@@ -39,30 +40,40 @@ public interface BufferStorage {
 	void add(BufferOrEvent boe) throws IOException;
 
 	/**
-	 * Starts a new sequence of buffers and event without reusing the same resources and
-	 * returns the current sequence of buffers for reading.
-	 *
-	 * @return The readable sequence of buffers and events, or 'null', if nothing was added.
+	 * @return true if size limit was exceeded.
 	 */
-	BufferOrEventSequence rollOverWithoutReusingResources() throws IOException;
+	boolean isFull();
 
 	/**
-	 * Starts a new sequence of buffers and event reusing the same resources and
-	 * returns the current sequence of buffers for reading.
-	 *
-	 * @return The readable sequence of buffers and events, or 'null', if nothing was added.
+	 * Start returning next sequence of stored {@link BufferOrEvent}s.
 	 */
-	BufferOrEventSequence rollOverReusingResources() throws IOException;
+	void rollOver() throws IOException;
 
 	/**
-	 * Cleans up all the resources in the current sequence.
+	 * @return the number of pending bytes blocked in the current sequence - bytes that are have not
+	 * been yet rolled, but are already blocked.
 	 */
-	void close() throws IOException;
+	long getPendingBytes();
 
 	/**
-	 * Gets the number of bytes blocked in the current sequence.
-	 *
-	 * @return the number of bytes blocked in the current sequence.
+	 * @return the number of already rolled bytes in in blocked sequences.
+	 */
+	long getRolledBytes();
+
+	/**
+	 * @return true if this {@link BufferStorage} doesn't store and data.
 	 */
-	long getBytesBlocked();
+	boolean isEmpty();
+
+	Optional<BufferOrEvent> pollNext() throws IOException;
+
+	long currentBufferedSize();
+
+	long getMaxBufferedBytes();
+
+	/**
+	 * Cleans up all the resources in the current sequence.
+	 */
+	@Override
+	void close() throws IOException;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
index e0a79c2..628a69c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 
 /**
@@ -34,7 +35,7 @@ import java.util.ArrayDeque;
  * alignment in exactly-once mode.
  */
 @Internal
-public class CachedBufferStorage implements BufferStorage {
+public class CachedBufferStorage extends AbstractBufferStorage {
 
 	/** The page size, to estimate the total cached data size. */
 	private final int pageSize;
@@ -46,13 +47,23 @@ public class CachedBufferStorage implements BufferStorage {
 	private ArrayDeque<BufferOrEvent> currentBuffers;
 
 	/**
-	 * Creates a new {@link CachedBufferStorage}, caching the buffers or events in memory queue.
+	 * Create a new {@link CachedBufferStorage} with unlimited storage.
 	 *
 	 * @param pageSize The page size used to estimate the cached size.
 	 */
 	public CachedBufferStorage(int pageSize) {
+		this(pageSize, -1, "Unknown");
+	}
+
+	/**
+	 * Creates a new {@link CachedBufferStorage}, caching the buffers or events in memory queue.
+	 *
+	 * @param pageSize The page size used to estimate the cached size.
+	 */
+	public CachedBufferStorage(int pageSize, long maxBufferedBytes, String taskName) {
+		super(maxBufferedBytes, taskName);
 		this.pageSize = pageSize;
-		this.currentBuffers = new ArrayDeque<BufferOrEvent>();
+		this.currentBuffers = new ArrayDeque<>();
 	}
 
 	@Override
@@ -84,17 +95,18 @@ public class CachedBufferStorage implements BufferStorage {
 	}
 
 	@Override
-	public void close() {
+	public void close() throws IOException {
 		BufferOrEvent boe;
 		while ((boe = currentBuffers.poll()) != null) {
 			if (boe.isBuffer()) {
 				boe.getBuffer().recycleBuffer();
 			}
 		}
+		super.close();
 	}
 
 	@Override
-	public long getBytesBlocked() {
+	public long getPendingBytes() {
 		return bytesBlocked;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index ebef48f..c9ec6bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -56,15 +56,13 @@ public class InputProcessorUtil {
 			if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
 				barrierHandler = new BarrierBuffer(
 					inputGate,
-					new CachedBufferStorage(inputGate.getPageSize()),
-					maxAlign,
+					new CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName),
 					taskName,
 					checkpointedTask);
 			} else {
 				barrierHandler = new BarrierBuffer(
 					inputGate,
-					new BufferSpiller(ioManager, inputGate.getPageSize()),
-					maxAlign,
+					new BufferSpiller(ioManager, inputGate.getPageSize(), maxAlign, taskName),
 					taskName,
 					checkpointedTask);
 			}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 8c97938..2eb3f5c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -118,8 +118,7 @@ public class BarrierBufferAlignmentLimitTest {
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
 		BarrierBuffer buffer = new BarrierBuffer(
 			gate,
-			new BufferSpiller(ioManager, gate.getPageSize()),
-			1000,
+			new BufferSpiller(ioManager, gate.getPageSize(), 1000),
 			"Testing",
 			toNotify);
 
@@ -216,8 +215,7 @@ public class BarrierBufferAlignmentLimitTest {
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
 		BarrierBuffer buffer = new BarrierBuffer(
 			gate,
-			new BufferSpiller(ioManager, gate.getPageSize()),
-			500,
+			new BufferSpiller(ioManager, gate.getPageSize(), 500),
 			"Testing",
 			toNotify);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 4633154..4d46451 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -92,7 +92,7 @@ public class BufferSpillerTest extends BufferStorageTestBase {
 		assertEquals(
 			"Changed the header format, but did not adjust the HEADER_SIZE field",
 			BufferSpiller.HEADER_SIZE + size,
-			spiller.getBytesBlocked());
+			spiller.getPendingBytes());
 	}
 
 	private static void checkNoTempFilesRemain() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index 0485d88..b23d3e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -30,11 +30,11 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Optional;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -50,9 +50,12 @@ public abstract class BufferStorageTestBase {
 	@Test
 	public void testRollOverEmptySequences() throws IOException {
 		BufferStorage bufferStorage = createBufferStorage();
-		assertNull(bufferStorage.rollOverReusingResources());
-		assertNull(bufferStorage.rollOverReusingResources());
-		assertNull(bufferStorage.rollOverReusingResources());
+		bufferStorage.rollOver();
+		assertFalse(bufferStorage.pollNext().isPresent());
+		bufferStorage.rollOver();
+		assertFalse(bufferStorage.pollNext().isPresent());
+		bufferStorage.rollOver();
+		assertFalse(bufferStorage.pollNext().isPresent());
 	}
 
 	@Test
@@ -92,31 +95,35 @@ public abstract class BufferStorageTestBase {
 			// reset and create reader
 			bufferRnd.setSeed(bufferSeed);
 
-			BufferOrEventSequence seq = bufferStorage.rollOverReusingResources();
-			seq.open();
+			bufferStorage.rollOver();
 
 			// read and validate the sequence
 
 			int numEvent = 0;
 			for (int i = 0; i < numEventsAndBuffers; i++) {
-				BufferOrEvent next = seq.getNext();
-				assertNotNull(next);
-				if (next.isEvent()) {
+				assertFalse(bufferStorage.isEmpty());
+
+				Optional<BufferOrEvent> next = bufferStorage.pollNext();
+				assertTrue(next.isPresent());
+				BufferOrEvent bufferOrEvent = next.get();
+
+				if (bufferOrEvent.isEvent()) {
 					BufferOrEvent expected = events.get(numEvent++);
-					assertEquals(expected.getEvent(), next.getEvent());
-					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+					assertEquals(expected.getEvent(), bufferOrEvent.getEvent());
+					assertEquals(expected.getChannelIndex(), bufferOrEvent.getChannelIndex());
 				} else {
-					validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
+					validateBuffer(
+						bufferOrEvent,
+						bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
 				}
 			}
 
 			// no further data
-			assertNull(seq.getNext());
+			assertFalse(bufferStorage.pollNext().isPresent());
+			assertTrue(bufferStorage.isEmpty());
 
 			// all events need to be consumed
 			assertEquals(events.size(), numEvent);
-
-			seq.cleanup();
 		}
 	}
 
@@ -126,15 +133,11 @@ public abstract class BufferStorageTestBase {
 
 		final Random rnd = new Random();
 
-		final int maxNumEventsAndBuffers = 30000;
+		final int maxNumEventsAndBuffers = 300;
 		final int maxNumChannels = 1656;
 
-		int sequencesConsumed = 0;
-
-		ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
-		SequenceToConsume currentSequence = null;
-		int currentNumEvents = 0;
-		int currentNumRecordAndEvents = 0;
+		ArrayDeque<ArrayDeque<BufferOrEvent>> expectedRolledSequences = new ArrayDeque<>();
+		ArrayDeque<BufferOrEvent> expectedPendingSequence = new ArrayDeque<>();
 
 		BufferStorage bufferStorage = createBufferStorage();
 
@@ -143,7 +146,9 @@ public abstract class BufferStorageTestBase {
 
 			if (round % 2 == 1) {
 				// make this an empty sequence
-				assertNull(bufferStorage.rollOverReusingResources());
+				bufferStorage.rollOver();
+				expectedRolledSequences.addFirst(expectedPendingSequence);
+				expectedPendingSequence = new ArrayDeque<>();
 			} else {
 				// proper spilled sequence
 				final long bufferSeed = rnd.nextLong();
@@ -152,12 +157,12 @@ public abstract class BufferStorageTestBase {
 				final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
 				final int numberOfChannels = rnd.nextInt(maxNumChannels) + 1;
 
-				final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+				final ArrayList<BufferOrEvent> events = new ArrayList<>(128);
 
 				int generated = 0;
 				while (generated < numEventsAndBuffers) {
 
-					if (currentSequence == null || rnd.nextDouble() < 0.5) {
+					if (rnd.nextDouble() < 0.5) {
 						// add a new record
 						boolean isEvent = rnd.nextDouble() < 0.05;
 						BufferOrEvent evt;
@@ -168,97 +173,66 @@ public abstract class BufferStorageTestBase {
 							evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
 						}
 						bufferStorage.add(evt);
+
+						expectedPendingSequence.addLast(evt);
 						generated++;
 					} else {
 						// consume a record
-						BufferOrEvent next = currentSequence.sequence.getNext();
-						assertNotNull(next);
-						if (next.isEvent()) {
-							BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
-							assertEquals(expected.getEvent(), next.getEvent());
-							assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-						} else {
-							Random validationRnd = currentSequence.bufferRnd;
-							validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numberOfChannels));
-						}
-
-						currentNumRecordAndEvents++;
-						if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
-							// done with the sequence
-							currentSequence.sequence.cleanup();
-							sequencesConsumed++;
-
-							// validate we had all events
-							assertEquals(currentSequence.events.size(), currentNumEvents);
+						bufferStorage.rollOver();
+						expectedRolledSequences.addFirst(expectedPendingSequence);
+						expectedPendingSequence = new ArrayDeque<>();
 
-							// reset
-							currentSequence = pendingSequences.pollFirst();
-							if (currentSequence != null) {
-								currentSequence.sequence.open();
-							}
-
-							currentNumRecordAndEvents = 0;
-							currentNumEvents = 0;
-						}
+						assertNextBufferOrEvent(expectedRolledSequences, bufferStorage);
 					}
 				}
-
-				// done generating a sequence. queue it for consumption
-				bufferRnd.setSeed(bufferSeed);
-				BufferOrEventSequence seq = bufferStorage.rollOverReusingResources();
-
-				SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numberOfChannels);
-
-				if (currentSequence == null) {
-					currentSequence = stc;
-					stc.sequence.open();
-				} else {
-					pendingSequences.addLast(stc);
-				}
+				bufferStorage.rollOver();
+				expectedRolledSequences.addFirst(expectedPendingSequence);
+				expectedPendingSequence = new ArrayDeque<>();
 			}
 		}
 
 		// consume all the remainder
-		while (currentSequence != null) {
-			// consume a record
-			BufferOrEvent next = currentSequence.sequence.getNext();
-			assertNotNull(next);
-			if (next.isEvent()) {
-				BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
-				assertEquals(expected.getEvent(), next.getEvent());
-				assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-			} else {
-				Random validationRnd = currentSequence.bufferRnd;
-				validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numberOfChannels));
-			}
+		while (!expectedRolledSequences.isEmpty()) {
+			assertNextBufferOrEvent(expectedRolledSequences, bufferStorage);
+		}
+	}
 
-			currentNumRecordAndEvents++;
-			if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
-				// done with the sequence
-				currentSequence.sequence.cleanup();
-				sequencesConsumed++;
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
 
-				// validate we had all events
-				assertEquals(currentSequence.events.size(), currentNumEvents);
+	private static void assertNextBufferOrEvent(
+			ArrayDeque<ArrayDeque<BufferOrEvent>> expectedRolledSequence,
+			BufferStorage bufferStorage) throws IOException {
+		while (!expectedRolledSequence.isEmpty() && expectedRolledSequence.peekFirst().isEmpty()) {
+			expectedRolledSequence.pollFirst();
+		}
 
-				// reset
-				currentSequence = pendingSequences.pollFirst();
-				if (currentSequence != null) {
-					currentSequence.sequence.open();
-				}
+		Optional<BufferOrEvent> next = bufferStorage.pollNext();
+		if (expectedRolledSequence.isEmpty()) {
+			assertFalse(next.isPresent());
+			return;
+		}
 
-				currentNumRecordAndEvents = 0;
-				currentNumEvents = 0;
-			}
+		while (!next.isPresent() && !bufferStorage.isEmpty()) {
+			next = bufferStorage.pollNext();
 		}
 
-		assertEquals(sequences, sequencesConsumed);
+		assertTrue(next.isPresent());
+		BufferOrEvent actualBufferOrEvent = next.get();
+		BufferOrEvent expectedBufferOrEvent = expectedRolledSequence.peekFirst().pollFirst();
+
+		if (expectedBufferOrEvent.isEvent()) {
+			assertEquals(expectedBufferOrEvent.getChannelIndex(), actualBufferOrEvent.getChannelIndex());
+			assertEquals(expectedBufferOrEvent.getEvent(), actualBufferOrEvent.getEvent());
+		} else {
+			validateBuffer(
+				actualBufferOrEvent,
+				expectedBufferOrEvent.getSize(),
+				expectedBufferOrEvent.getChannelIndex());
+		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  Utils
-	// ------------------------------------------------------------------------
-
 	private static BufferOrEvent generateRandomEvent(Random rnd, int numberOfChannels) {
 		long magicNumber = rnd.nextLong();
 		byte[] data = new byte[rnd.nextInt(1000)];
@@ -297,29 +271,4 @@ public abstract class BufferStorageTestBase {
 			}
 		}
 	}
-
-	/**
-	 * Wrappers the buffered sequence and related elements for consuming and validation.
-	 */
-	private static class SequenceToConsume {
-
-		final BufferOrEventSequence sequence;
-		final ArrayList<BufferOrEvent> events;
-		final Random bufferRnd;
-		final int numBuffersAndEvents;
-		final int numberOfChannels;
-
-		private SequenceToConsume(
-				Random bufferRnd,
-				ArrayList<BufferOrEvent> events,
-				BufferOrEventSequence sequence,
-				int numBuffersAndEvents,
-				int numberOfChannels) {
-			this.bufferRnd = bufferRnd;
-			this.events = events;
-			this.sequence = sequence;
-			this.numBuffersAndEvents = numBuffersAndEvents;
-			this.numberOfChannels = numberOfChannels;
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index bbfe8b6..3db884d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -33,7 +33,7 @@ public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
 
 	@Override
 	BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) {
-		return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE), -1, "Testing", toNotify);
+		return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE), "Testing", toNotify);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
index 2101f40..f9541a9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
@@ -68,7 +68,7 @@ public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
 
 	@Override
 	BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException {
-		return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE), -1, "Testing", toNotify);
+		return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE), "Testing", toNotify);
 	}
 
 	@Override