You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/19 10:28:15 UTC

[GitHub] NicoK closed pull request #6692: [FLINK-10331][network] reduce unnecesary flushing

NicoK closed pull request #6692: [FLINK-10331][network] reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 41ee03db259..8630acee9a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -485,7 +485,7 @@ private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRec
 			}
 			else {
 				// collect in memory
-				ensureBufferCapacity(numBytesChunk);
+				ensureBufferCapacity(nextRecordLength);
 				partial.segment.get(partial.position, buffer, 0, numBytesChunk);
 			}
 
@@ -502,8 +502,8 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in
 			int segmentRemaining = numBytes;
 			// check where to go. if we have a partial length, we need to complete it first
 			if (this.lengthBuffer.position() > 0) {
-				int toPut = Math.min(this.lengthBuffer.remaining(), numBytes);
-				segment.get(offset, this.lengthBuffer, toPut);
+				int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+				segment.get(segmentPosition, this.lengthBuffer, toPut);
 				// did we complete the length?
 				if (this.lengthBuffer.hasRemaining()) {
 					return;
@@ -515,6 +515,8 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in
 					segmentRemaining -= toPut;
 					if (this.recordLength > THRESHOLD_FOR_SPILLING) {
 						this.spillingChannel = createSpillingChannel();
+					} else {
+						ensureBufferCapacity(this.recordLength);
 					}
 				}
 			}
@@ -527,9 +529,7 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in
 				// spill to file
 				ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
 				this.spillingChannel.write(toWrite);
-			}
-			else {
-				ensureBufferCapacity(accumulatedRecordBytes + toCopy);
+			} else {
 				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
 			}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f1842911..6fb067ef8c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public void commit() {
 	 * @return number of written bytes.
 	 */
 	public int finish() {
-		positionMarker.markFinished();
+		int writtenBytes = positionMarker.markFinished();
 		commit();
-		return getWrittenBytes();
+		return writtenBytes;
 	}
 
 	public boolean isFinished() {
@@ -118,18 +118,10 @@ public boolean isFull() {
 		return positionMarker.getCached() == getMaxCapacity();
 	}
 
-	public boolean isEmpty() {
-		return positionMarker.getCached() == 0;
-	}
-
 	public int getMaxCapacity() {
 		return memorySegment.size();
 	}
 
-	private int getWrittenBytes() {
-		return positionMarker.getCached();
-	}
-
 	/**
 	 * Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder}
 	 * has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer.
@@ -156,7 +148,7 @@ static int getAbsolute(int position) {
 	 * Cached writing implementation of {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 *
 	 * <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible.
 	 */
@@ -181,12 +173,19 @@ public int getCached() {
 			return PositionMarker.getAbsolute(cachedPosition);
 		}
 
-		public void markFinished() {
-			int newValue = -getCached();
+		/**
+		 * Marks this position as finished and returns the current position.
+		 *
+		 * @return current position as of {@link #getCached()}
+		 */
+		public int markFinished() {
+			int currentPosition = getCached();
+			int newValue = -currentPosition;
 			if (newValue == 0) {
 				newValue = FINISHED_EMPTY;
 			}
 			set(newValue);
+			return currentPosition;
 		}
 
 		public void move(int offset) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 4bad92f06b0..abde3ffc793 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@
 
 	private final CachedPositionMarker writerPosition;
 
-	private int currentReaderPosition = 0;
+	private int currentReaderPosition;
 
 	/**
 	 * Constructs {@link BufferConsumer} instance with content that can be changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ private BufferConsumer(Buffer buffer, BufferBuilder.PositionMarker currentWriter
 		this.currentReaderPosition = currentReaderPosition;
 	}
 
+	/**
+	 * Checks whether the {@link BufferBuilder} has already been finished.
+	 *
+	 * <p>BEWARE: this method accesses the cached value of the position marker which is only updated
+	 * after calls to {@link #build()}!
+	 *
+	 * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> otherwise
+	 */
 	public boolean isFinished() {
 		return writerPosition.isFinished();
 	}
@@ -84,17 +92,20 @@ public boolean isFinished() {
 	 */
 	public Buffer build() {
 		writerPosition.update();
-		Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition);
-		currentReaderPosition = writerPosition.getCached();
+		int cachedWriterPosition = writerPosition.getCached();
+		Buffer slice = buffer.readOnlySlice(currentReaderPosition, cachedWriterPosition - currentReaderPosition);
+		currentReaderPosition = cachedWriterPosition;
 		return slice.retainBuffer();
 	}
 
 	/**
-	 * @return a retained copy of self with separate indexes - it allows two read from the same {@link MemorySegment}
+	 * Creates a retained copy of self with separate indexes which allows two read from the same {@link MemorySegment}
 	 * twice.
 	 *
-	 * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In
-	 * other words, data already consumed before copying will not be visible to the returned copies.
+	 * <p>WARNING: the newly returned {@link BufferConsumer} will have its reader index copied from the original buffer.
+	 * In other words, data already consumed before copying will not be visible to the returned copies.
+	 *
+	 * @return a retained copy of self with separate indexes
 	 */
 	public BufferConsumer copy() {
 		return new BufferConsumer(buffer.retainBuffer(), writerPosition.positionMarker, currentReaderPosition);
@@ -123,7 +134,7 @@ public int getWrittenBytes() {
 	 * Cached reading wrapper around {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 */
 	private static class CachedPositionMarker {
 		private final PositionMarker positionMarker;
@@ -133,7 +144,7 @@ public int getWrittenBytes() {
 		 */
 		private int cachedPosition;
 
-		public CachedPositionMarker(PositionMarker positionMarker) {
+		CachedPositionMarker(PositionMarker positionMarker) {
 			this.positionMarker = checkNotNull(positionMarker);
 			update();
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index deb0f4d8e75..a5bf30f4c6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -92,7 +92,7 @@ public NetworkBuffer(MemorySegment memorySegment, BufferRecycler recycler, boole
 
 	/**
 	 * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
-	 * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+	 * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>.
 	 *
 	 * @param memorySegment
 	 * 		backing memory segment (defines {@link #maxCapacity})
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 9aa3920934e..90daf75fcc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -327,8 +327,7 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B
 				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
-				buffer.setSize(receivedSize);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 367c62d5acf..796e86f51b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -337,8 +337,7 @@ else if (bufferProvider.isDestroyed()) {
 				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
-				buffer.setSize(receivedSize);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 74798625416..2f7881610dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+/**
+ * Exception for failed partition requests due to non-existing partitions.
+ */
 public class PartitionNotFoundException extends IOException {
 
 	private static final long serialVersionUID = 0L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c6f3e158519..d2d7fdb324b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -36,6 +36,19 @@
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
+ *
+ * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second
+ * {@link BufferConsumer} (in which case we will assume the first one finished), we will
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via
+ * {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling
+ * {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and
+ * then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows
+ * no more buffers being available. This results in a buffer queue which is either empty or has an
+ * unfinished {@link BufferConsumer} left from which the notifications will eventually start again.
+ *
+ * <p>Explicit calls to {@link #flush()} will force this
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
+ * {@link BufferConsumer} present in the queue.
  */
 class PipelinedSubpartition extends ResultSubpartition {
 
@@ -66,17 +79,6 @@ public boolean add(BufferConsumer bufferConsumer) {
 		return add(bufferConsumer, false);
 	}
 
-	@Override
-	public void flush() {
-		synchronized (buffers) {
-			if (buffers.isEmpty()) {
-				return;
-			}
-			flushRequested = !buffers.isEmpty();
-			notifyDataAvailable();
-		}
-	}
-
 	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
@@ -99,7 +101,7 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish) {
 
 			if (finish) {
 				isFinished = true;
-				flush();
+				notifyDataAvailable();
 			}
 			else {
 				maybeNotifyDataAvailable();
@@ -188,17 +190,17 @@ BufferAndBacklog pollBuffer() {
 				buffer,
 				isAvailableUnsafe(),
 				getBuffersInBacklog(),
-				_nextBufferIsEvent());
+				nextBufferIsEventUnsafe());
 		}
 	}
 
 	boolean nextBufferIsEvent() {
 		synchronized (buffers) {
-			return _nextBufferIsEvent();
+			return nextBufferIsEventUnsafe();
 		}
 	}
 
-	private boolean _nextBufferIsEvent() {
+	private boolean nextBufferIsEventUnsafe() {
 		assert Thread.holdsLock(buffers);
 
 		return !buffers.isEmpty() && !buffers.peekFirst().isBuffer();
@@ -279,6 +281,23 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 		return Math.max(buffers.size(), 0);
 	}
 
+	@Override
+	public void flush() {
+		synchronized (buffers) {
+			if (buffers.isEmpty()) {
+				return;
+			}
+			if (!flushRequested) {
+				flushRequested = true; // set this before the notification!
+				// if there is more then 1 buffer, we already notified the reader
+				// (at the latest when adding the second buffer)
+				if (buffers.size() == 1) {
+					notifyDataAvailable();
+				}
+			}
+		}
+	}
+
 	private void maybeNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
 		if (getNumberOfFinishedBuffers() == 1) {
@@ -295,6 +314,9 @@ private void notifyDataAvailable() {
 	private int getNumberOfFinishedBuffers() {
 		assert Thread.holdsLock(buffers);
 
+		// NOTE: isFinished() is not guaranteed to provide the most up-to-date state here
+		// worst-case: a single finished buffer sits around until the next flush() call
+		// (but we do not offer stronger guarantees anyway)
 		if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
 			return 1;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 93e5ba15097..b32f73f8bcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -48,17 +48,17 @@
 /**
  * A result partition for data produced by a single task.
  *
- * <p> This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
+ * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
  * or more {@link ResultSubpartition} instances, which further partition the data depending on the
  * number of consuming tasks and the data {@link DistributionPattern}.
  *
- * <p> Tasks, which consume a result partition have to request one of its subpartitions. The request
+ * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
  *
  * <h2>Life-cycle</h2>
  *
- * The life-cycle of each result partition has three (possibly overlapping) phases:
+ * <p>The life-cycle of each result partition has three (possibly overlapping) phases:
  * <ol>
  * <li><strong>Produce</strong>: </li>
  * <li><strong>Consume</strong>: </li>
@@ -67,7 +67,7 @@
  *
  * <h2>Lazy deployment and updates of consuming tasks</h2>
  *
- * Before a consuming task can request the result, it has to be deployed. The time of deployment
+ * <p>Before a consuming task can request the result, it has to be deployed. The time of deployment
  * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined
  * results, receivers are deployed as soon as the first buffer is added to the result partition.
  * With blocking results on the other hand, receivers are deployed after the partition is finished.
@@ -79,7 +79,7 @@
 public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
-	
+
 	private final String owningTaskName;
 
 	private final TaskActions taskActions;
@@ -174,10 +174,10 @@ public ResultPartition(
 
 	/**
 	 * Registers a buffer pool with this result partition.
-	 * <p>
-	 * There is one pool for each result partition, which is shared by all its sub partitions.
-	 * <p>
-	 * The pool is registered with the partition *after* it as been constructed in order to conform
+	 *
+	 * <p>There is one pool for each result partition, which is shared by all its sub partitions.
+	 *
+	 * <p>The pool is registered with the partition *after* it as been constructed in order to conform
 	 * to the life-cycle of task registrations in the {@link TaskManager}.
 	 */
 	public void registerBufferPool(BufferPool bufferPool) {
@@ -276,9 +276,9 @@ public void flush(int subpartitionIndex) {
 	/**
 	 * Finishes the result partition.
 	 *
-	 * <p> After this operation, it is not possible to add further data to the result partition.
+	 * <p>After this operation, it is not possible to add further data to the result partition.
 	 *
-	 * <p> For BLOCKING results, this will trigger the deployment of consuming tasks.
+	 * <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
 	 */
 	public void finish() throws IOException {
 		boolean success = false;
@@ -366,7 +366,7 @@ public int getNumTargetKeyGroups() {
 	/**
 	 * Releases buffers held by this result partition.
 	 *
-	 * <p> This is a callback from the buffer pool, which is registered for result partitions, which
+	 * <p>This is a callback from the buffer pool, which is registered for result partitions, which
 	 * are back pressure-free.
 	 */
 	@Override
@@ -395,7 +395,7 @@ public String toString() {
 	/**
 	 * Pins the result partition.
 	 *
-	 * <p> The partition can only be released after each subpartition has been consumed once per pin
+	 * <p>The partition can only be released after each subpartition has been consumed once per pin
 	 * operation.
 	 */
 	void pin() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 02212ced905..10eb0868e87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
+/**
+ * Interface for notifications about consumable partitions.
+ */
 public interface ResultPartitionConsumableNotifier {
 	void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index b84c33b8afb..cee79a0d2a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -27,7 +27,7 @@
 /**
  * Runtime identifier of a produced {@link IntermediateResultPartition}.
  *
- * <p> In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
+ * <p>In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
  * identify a result partition. It needs to be associated with the producing task as well to ensure
  * correct tracking of failed/restarted tasks.
  */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index db72d63cf8e..faeaaf2f997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+/**
+ * Interface for creating result partitions.
+ */
 public interface ResultPartitionProvider {
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 256387c31f4..f62dbeebf64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+/**
+ * Type of a result partition.
+ */
 public enum ResultPartitionType {
 
 	BLOCKING(false, false, false),
@@ -27,12 +30,12 @@
 	/**
 	 * Pipelined partitions with a bounded (local) buffer pool.
 	 *
-	 * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+	 * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
 	 * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
 	 * overall network buffer pool size, this, however, still allows to be flexible with regards
 	 * to the total number of partitions by selecting an appropriately big network buffer pool size.
 	 *
-	 * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+	 * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
 	 * no checkpoint barriers.
 	 */
 	PIPELINED_BOUNDED(true, true, true);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index adc0ed35a2f..58a140221e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -43,16 +43,16 @@
 	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
 	protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
 
-	/** The number of non-event buffers currently in this subpartition */
+	/** The number of non-event buffers currently in this subpartition. */
 	@GuardedBy("buffers")
 	private int buffersInBacklog;
 
 	// - Statistics ----------------------------------------------------------
 
-	/** The total number of buffers (both data and event buffers) */
+	/** The total number of buffers (both data and event buffers). */
 	private long totalNumberOfBuffers;
 
-	/** The total number of bytes (both data and event buffers) */
+	/** The total number of bytes (both data and event buffers). */
 	private long totalNumberOfBytes;
 
 	public ResultSubpartition(int index, ResultPartition parent) {
@@ -102,19 +102,19 @@ protected Throwable getFailureCause() {
 	 * @throws IOException
 	 * 		thrown in case of errors while adding the buffer
 	 */
-	abstract public boolean add(BufferConsumer bufferConsumer) throws IOException;
+	public abstract boolean add(BufferConsumer bufferConsumer) throws IOException;
 
-	abstract public void flush();
+	public abstract void flush();
 
-	abstract public void finish() throws IOException;
+	public abstract void finish() throws IOException;
 
-	abstract public void release() throws IOException;
+	public abstract void release() throws IOException;
 
-	abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
+	public abstract ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
 
 	abstract int releaseMemory() throws IOException;
 
-	abstract public boolean isReleased();
+	public abstract boolean isReleased();
 
 	/**
 	 * Gets the number of non-event buffers in this subpartition.
@@ -132,7 +132,7 @@ public int getBuffersInBacklog() {
 	 * This method must not acquire locks or interfere with the task and network threads in
 	 * any way.
 	 */
-	abstract public int unsynchronizedGetNumberOfQueuedBuffers();
+	public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
 	/**
 	 * Decreases the number of non-event buffers by one after fetching a non-event
@@ -198,7 +198,6 @@ public int buffersInBacklog() {
 			return buffersInBacklog;
 		}
 
-
 		public boolean nextBufferIsEvent() {
 			return nextBufferIsEvent;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index b1ccd634704..a7559553857 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -22,6 +22,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 2a7cedf6949..a08ecc21f7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -34,8 +34,8 @@
 
 /**
  * An input channel consumes a single {@link ResultSubpartitionView}.
- * <p>
- * For each channel, the consumption life cycle is as follows:
+ *
+ * <p>For each channel, the consumption life cycle is as follows:
  * <ol>
  * <li>{@link #requestSubpartition(int)}</li>
  * <li>{@link #getNextBuffer()}</li>
@@ -66,7 +66,7 @@
 
 	protected final Counter numBuffersIn;
 
-	/** The current backoff (in ms) */
+	/** The current backoff (in ms). */
 	private int currentBackoff;
 
 	protected InputChannel(
@@ -111,12 +111,12 @@ public ResultPartitionID getPartitionId() {
 
 	/**
 	 * Notifies the owning {@link SingleInputGate} that this channel became non-empty.
-	 * 
+	 *
 	 * <p>This is guaranteed to be called only when a Buffer was added to a previously
 	 * empty input channel. The notion of empty is atomically consistent with the flag
 	 * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer
 	 * from this channel.
-	 * 
+	 *
 	 * <p><b>Note:</b> When the input channel observes an exception, this
 	 * method is called regardless of whether the channel was empty before. That ensures
 	 * that the parent InputGate will always be notified about the exception.
@@ -132,8 +132,8 @@ protected void notifyChannelNonEmpty() {
 	/**
 	 * Requests the queue with the specified index of the source intermediate
 	 * result partition.
-	 * <p>
-	 * The queue index to request depends on which sub task the channel belongs
+	 *
+	 * <p>The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
 	abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
@@ -149,8 +149,8 @@ protected void notifyChannelNonEmpty() {
 
 	/**
 	 * Sends a {@link TaskEvent} back to the task producing the consumed result partition.
-	 * <p>
-	 * <strong>Important</strong>: The producing task has to be running to receive backwards events.
+	 *
+	 * <p><strong>Important</strong>: The producing task has to be running to receive backwards events.
 	 * This means that the result type needs to be pipelined and the task logic has to ensure that
 	 * the producer will wait for all backwards events. Otherwise, this will lead to an Exception
 	 * at runtime.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
index ceeb83dab40..c1886de6905 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
@@ -22,6 +22,9 @@
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+/**
+ * Identifier for input channels.
+ */
 public class InputChannelID extends AbstractID {
 
 	private static final long serialVersionUID = 1L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c78abb5165a..6e59f915730 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -26,10 +26,10 @@
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator produces data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -38,7 +38,7 @@
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
  * subpartitions.
  *
@@ -59,7 +59,7 @@
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
  * subpartitions -- one for each parallel reduce subtask. As shown in the Figure, each reduce task
  * will have an input gate attached to it. This will provide its input, which will consist of one
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 69af4553fe4..ebb8b9d131b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -40,7 +40,7 @@ private InputGateMetrics(SingleInputGate inputGate) {
 
 	// ------------------------------------------------------------------------
 
-	// these methods are package private to make access from the nested classes faster 
+	// these methods are package private to make access from the nested classes faster
 
 	/**
 	 * Iterates over all input channels and collects the total number of queued buffers in a
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4b3a8ff9773..e7986bb381c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -57,7 +57,7 @@
 	/** Task event dispatcher for backwards events. */
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	/** The consumed subpartition */
+	/** The consumed subpartition. */
 	private volatile ResultSubpartitionView subpartitionView;
 
 	private volatile boolean isReleased;
@@ -245,7 +245,7 @@ void notifySubpartitionConsumed() throws IOException {
 	}
 
 	/**
-	 * Releases the partition reader
+	 * Releases the partition reader.
 	 */
 	@Override
 	void releaseAllResources() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 20a7aed76e2..c0e9177b294 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -89,8 +89,8 @@ public void sendTaskEvent(TaskEvent event) throws IOException {
 
 	/**
 	 * Returns <code>false</code>.
-	 * <p>
-	 * <strong>Important</strong>: It is important that the method correctly
+	 *
+	 * <p><strong>Important</strong>: It is important that the method correctly
 	 * always <code>false</code> for unknown input channels in order to not
 	 * finish the consumption of an intermediate result partition early.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index f73ede78dfd..f7db40bd921 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -59,7 +59,7 @@ public ResultSubpartitionView answer(InvocationOnMock invocation) throws Throwab
 
 		return manager;
 	}
-	
+
 	public static ConnectionManager createDummyConnectionManager() throws Exception {
 		final PartitionRequestClient mockClient = mock(PartitionRequestClient.class);
 
@@ -71,6 +71,6 @@ public static ConnectionManager createDummyConnectionManager() throws Exception
 
 	// ------------------------------------------------------------------------
 
-	/** This class is not meant to be instantiated */
+	/** This class is not meant to be instantiated. */
 	private InputChannelTestUtils() {}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 73f3cfbc49a..5c643af1739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -46,6 +47,9 @@
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Concurrency tests for input gates.
+ */
 public class InputGateConcurrentTest {
 
 	@Test
@@ -192,9 +196,11 @@ public void testConsumptionWithMixedChannels() throws Exception {
 	//  testing threads
 	// ------------------------------------------------------------------------
 
-	private static abstract class Source {
-	
+	private abstract static class Source {
+
 		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
+
+		abstract void flush();
 	}
 
 	private static class PipelinedSubpartitionSource extends Source {
@@ -209,6 +215,11 @@ public void testConsumptionWithMixedChannels() throws Exception {
 		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
 			partition.add(bufferConsumer);
 		}
+
+		@Override
+		void flush() {
+			partition.flush();
+		}
 	}
 
 	private static class RemoteChannelSource extends Source {
@@ -222,14 +233,19 @@ void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
 
 		@Override
 		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
 			try {
-				channel.onBuffer(bufferConsumer.build(), seq++, -1);
+				Buffer buffer = bufferConsumer.build();
+				checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
+				channel.onBuffer(buffer, seq++, -1);
 			}
 			finally {
 				bufferConsumer.close();
 			}
 		}
+
+		@Override
+		void flush() {
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -245,6 +261,7 @@ void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
 		private final int yieldAfter;
 
 		ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) {
+			super("producer");
 			this.sources = sources;
 			this.numTotal = numTotal;
 			this.maxChunk = maxChunk;
@@ -273,7 +290,10 @@ public void go() throws Exception {
 					//noinspection CallToThreadYield
 					Thread.yield();
 				}
+			}
 
+			for (Source source : sources) {
+				source.flush();
 			}
 		}
 	}
@@ -284,6 +304,7 @@ public void go() throws Exception {
 		private final int numBuffers;
 
 		ConsumerThread(SingleInputGate gate, int numBuffers) {
+			super("consumer");
 			this.gate = gate;
 			this.numBuffers = numBuffers;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 82a27cc92c0..66918757462 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -58,6 +58,9 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests verifying fairness in input gates.
+ */
 public class InputGateFairnessTest {
 
 	@Test
@@ -115,7 +118,7 @@ public void testFairConsumptionLocalChannelsPreFilled() throws Exception {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 		}
 
 		assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -207,11 +210,11 @@ public void testFairConsumptionRemoteChannelsPreFilled() throws Exception {
 
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
-					gate, i, new ResultPartitionID(), mock(ConnectionID.class), 
+					gate, i, new ResultPartitionID(), mock(ConnectionID.class),
 					connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 			channels[i] = channel;
-			
+
 			for (int p = 0; p < buffersPerChannel; p++) {
 				channel.onBuffer(mockBuffer, p, -1);
 			}
@@ -233,7 +236,7 @@ public void testFairConsumptionRemoteChannelsPreFilled() throws Exception {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 		}
 
 		assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -287,7 +290,7 @@ public void testFairConsumptionRemoteChannels() throws Exception {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 
 			if (i % (2 * numChannels) == 0) {
 				// add three buffers to each channel, in random order
@@ -336,7 +339,7 @@ private void fillRandom(
 			partitions[i].onBuffer(buffer, sequenceNumbers[i]++, -1);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
@@ -372,7 +375,6 @@ public FairnessVerifyingInputGate(
 			this.uniquenessChecker = new HashSet<>();
 		}
 
-
 		@Override
 		public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
 			synchronized (channelsWithData) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
index aecab759c78..b83067c11f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -40,14 +40,17 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
-	private final static int NUMBER_OF_TMS = 1;
-	private final static int NUMBER_OF_SLOTS_PER_TM = 1;
-	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+	private static final int NUMBER_OF_TMS = 1;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
-	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+	private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
 
 	private static TestingCluster flink;
 
@@ -72,7 +75,7 @@ public static void tearDown() throws Exception {
 	/**
 	 * Tests a fix for FLINK-1930.
 	 *
-	 * <p> When consuming a pipelined result only partially, is is possible that local channels
+	 * <p>When consuming a pipelined result only partially, is is possible that local channels
 	 * release the buffer pool, which is associated with the result partition, too early.  If the
 	 * producer is still producing data when this happens, it runs into an IllegalStateException,
 	 * because of the destroyed buffer pool.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 2eec34cdae2..f6689fe72d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -40,6 +40,9 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class PartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
@@ -78,8 +81,8 @@ public static void tearDown() throws Exception {
 	/**
 	 * Tests a fix for FLINK-1930.
 	 *
-	 * <p> When consuming a pipelined result only partially, is is possible that local channels
-	 * release the buffer pool, which is associated with the result partition, too early.  If the
+	 * <p>When consuming a pipelined result only partially, is is possible that local channels
+	 * release the buffer pool, which is associated with the result partition, too early. If the
 	 * producer is still producing data when this happens, it runs into an IllegalStateException,
 	 * because of the destroyed buffer pool.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index bc66c9d292d..82f61ab493b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -25,7 +25,6 @@
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
@@ -41,17 +40,12 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -59,10 +53,15 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link PipelinedSubpartition}.
+ *
+ * @see PipelinedSubpartitionWithReadViewTest
+ */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
-	/** Executor service for concurrent produce/consume tests */
-	private final static ExecutorService executorService = Executors.newCachedThreadPool();
+	/** Executor service for concurrent produce/consume tests. */
+	private static final ExecutorService executorService = Executors.newCachedThreadPool();
 
 	@AfterClass
 	public static void shutdownExecutorService() throws Exception {
@@ -76,146 +75,6 @@ PipelinedSubpartition createSubpartition() {
 		return new PipelinedSubpartition(0, parent);
 	}
 
-	@Test(expected = IllegalStateException.class)
-	public void testAddTwoNonFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			subpartition.add(createBufferBuilder().createBufferConsumer());
-			subpartition.add(createBufferBuilder().createBufferConsumer());
-			assertNull(readView.getNextBuffer());
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testAddEmptyNonFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			BufferBuilder bufferBuilder = createBufferBuilder();
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			assertEquals(0, availablityListener.getNumNotifications());
-			assertNull(readView.getNextBuffer());
-
-			bufferBuilder.finish();
-			bufferBuilder = createBufferBuilder();
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
-			assertNull(readView.getNextBuffer());
-			assertEquals(1, subpartition.getBuffersInBacklog());
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testAddNonEmptyNotFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			BufferBuilder bufferBuilder = createBufferBuilder();
-			bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			// note that since the buffer builder is not finished, there is still a retained instance!
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertEquals(1, subpartition.getBuffersInBacklog());
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
-	 * busy loop on the unfinished BufferConsumers.
-	 */
-	@Test
-	public void testUnfinishedBufferBehindFinished() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
-			assertNextBuffer(readView, 1025, false, 1, false, true);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
-	 * of the data.
-	 */
-	@Test
-	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-			subpartition.flush();
-
-			assertNextBuffer(readView, 1025, true, 1, false, true);
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testMultipleEmptyBuffers() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(0));
-
-			assertEquals(1, availablityListener.getNumNotifications());
-			subpartition.add(createFilledBufferConsumer(0));
-			assertEquals(2, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(0));
-			assertEquals(2, availablityListener.getNumNotifications());
-			assertEquals(3, subpartition.getBuffersInBacklog());
-
-			subpartition.add(createFilledBufferConsumer(1024));
-			assertEquals(2, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1024, false, 0, false, true);
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
 	@Test
 	public void testIllegalReadViewRequest() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
@@ -231,120 +90,23 @@ public void testIllegalReadViewRequest() throws Exception {
 		}
 	}
 
+	/**
+	 * Verifies that the isReleased() check of the view checks the parent
+	 * subpartition.
+	 */
 	@Test
-	public void testEmptyFlush() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
+	public void testIsReleasedChecksParent() {
+		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
 
-		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		subpartition.createReadView(listener);
-		subpartition.flush();
-		assertEquals(0, listener.getNumNotifications());
-	}
+		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
+			subpartition, mock(BufferAvailabilityListener.class));
 
-	@Test
-	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
+		assertFalse(reader.isReleased());
+		verify(subpartition, times(1)).isReleased();
 
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-
-		ResultSubpartitionView view = subpartition.createReadView(listener);
-
-		// Empty => should return null
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
-		assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
-		verify(listener, times(0)).notifyDataAvailable();
-
-		// Add data to the queue...
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(1, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-
-		// ...should have resulted in a notification
-		verify(listener, times(1)).notifyDataAvailable();
-
-		// ...and one available result
-		assertFalse(view.nextBufferIsEvent());
-		BufferAndBacklog read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
-		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// Add data to the queue...
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(2, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		verify(listener, times(2)).notifyDataAvailable();
-
-		assertFalse(view.nextBufferIsEvent());
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
-		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// some tests with events
-
-		// fill with: buffer, event , and buffer
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
-		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		verify(listener, times(4)).notifyDataAvailable();
-
-		assertFalse(view.nextBufferIsEvent()); // the first buffer
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
-		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertTrue(read.nextBufferIsEvent());
-
-		assertTrue(view.nextBufferIsEvent()); // the event
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertFalse(read.buffer().isBuffer());
-		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-
-		assertFalse(view.nextBufferIsEvent()); // the remaining buffer
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
-		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-
-		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
-		verify(listener, times(4)).notifyDataAvailable();
+		when(subpartition.isReleased()).thenReturn(true);
+		assertTrue(reader.isReleased());
+		verify(subpartition, times(2)).isReleased();
 	}
 
 	@Test
@@ -367,25 +129,6 @@ public void testConcurrentSlowProduceAndSlowConsume() throws Exception {
 		testProduceConsume(true, true);
 	}
 
-	/**
-	 * Verifies that the isReleased() check of the view checks the parent
-	 * subpartition.
-	 */
-	@Test
-	public void testIsReleasedChecksParent() throws Exception {
-		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
-
-		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
-				subpartition, mock(BufferAvailabilityListener.class));
-
-		assertFalse(reader.isReleased());
-		verify(subpartition, times(1)).isReleased();
-
-		when(subpartition.isReleased()).thenReturn(true);
-		assertTrue(reader.isReleased());
-		verify(subpartition, times(2)).isReleased();
-	}
-
 	private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
 		// Config
 		final int producerBufferPoolSize = 8;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
new file mode 100644
index 00000000000..6f9920ebc39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for {@link PipelinedSubpartition} which require an availability listener and a
+ * read view.
+ *
+ * @see PipelinedSubpartitionTest
+ */
+public class PipelinedSubpartitionWithReadViewTest {
+
+	private PipelinedSubpartition subpartition;
+	private AwaitableBufferAvailablityListener availablityListener;
+	private PipelinedSubpartitionView readView;
+
+	@Before
+	public void setup() throws IOException {
+		final ResultPartition parent = mock(ResultPartition.class);
+		subpartition = new PipelinedSubpartition(0, parent);
+		availablityListener = new AwaitableBufferAvailablityListener();
+		readView = subpartition.createReadView(availablityListener);
+	}
+
+	@After
+	public void tearDown() {
+		readView.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testAddTwoNonFinishedBuffer() {
+		subpartition.add(createBufferBuilder().createBufferConsumer());
+		subpartition.add(createBufferBuilder().createBufferConsumer());
+		assertNull(readView.getNextBuffer());
+	}
+
+	@Test
+	public void testAddEmptyNonFinishedBuffer() {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		assertEquals(0, availablityListener.getNumNotifications());
+		assertNull(readView.getNextBuffer());
+
+		bufferBuilder.finish();
+		bufferBuilder = createBufferBuilder();
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
+		assertNull(readView.getNextBuffer());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+	}
+
+	@Test
+	public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		// note that since the buffer builder is not finished, there is still a retained instance!
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertEquals(1, subpartition.getBuffersInBacklog());
+	}
+
+	/**
+	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
+	 * busy loop on the unfinished BufferConsumers.
+	 */
+	@Test
+	public void testUnfinishedBufferBehindFinished() throws Exception {
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+		assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
+		assertNextBuffer(readView, 1025, false, 1, false, true);
+		// not notified, but we could still access the unfinished buffer
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	/**
+	 * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
+	 * of the data.
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+		long oldNumNotifications = availablityListener.getNumNotifications();
+		subpartition.flush();
+		// buffer queue is > 1, should already be notified, no further notification necessary
+		assertThat(oldNumNotifications, greaterThan(0L));
+		assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1025, true, 1, false, true);
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	/**
+	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+		// no buffers -> no notification or any other effects
+		subpartition.flush();
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+		assertNextBuffer(readView, 1025, false, 1, false, true);
+
+		long oldNumNotifications = availablityListener.getNumNotifications();
+		subpartition.flush();
+		// buffer queue is 1 again -> need to flush
+		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+		subpartition.flush();
+		// calling again should not flush again
+		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	@Test
+	public void testMultipleEmptyBuffers() throws Exception {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(0));
+
+		assertEquals(1, availablityListener.getNumNotifications());
+		subpartition.add(createFilledBufferConsumer(0));
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(0));
+		assertEquals(2, availablityListener.getNumNotifications());
+		assertEquals(3, subpartition.getBuffersInBacklog());
+
+		subpartition.add(createFilledBufferConsumer(1024));
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1024, false, 0, false, true);
+	}
+
+	@Test
+	public void testEmptyFlush()  {
+		subpartition.flush();
+		assertEquals(0, availablityListener.getNumNotifications());
+	}
+
+	@Test
+	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
+		// Empty => should return null
+		assertFalse(readView.nextBufferIsEvent());
+		assertNoNextBuffer(readView);
+		assertFalse(readView.nextBufferIsEvent()); // also after getNextBuffer()
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		// Add data to the queue...
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(1, subpartition.getTotalNumberOfBuffers());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+
+		// ...should have resulted in a notification
+		assertEquals(1, availablityListener.getNumNotifications());
+
+		// ...and one available result
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// Add data to the queue...
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(2, subpartition.getTotalNumberOfBuffers());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// some tests with events
+
+		// fill with: buffer, event, and buffer
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(5, subpartition.getTotalNumberOfBuffers());
+		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(4, availablityListener.getNumNotifications());
+
+		// the first buffer
+		assertNextBuffer(readView, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
+		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
+		// the event
+		assertNextEvent(readView, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
+		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
+		// the remaining buffer
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// nothing more
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		assertEquals(5, subpartition.getTotalNumberOfBuffers());
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+		assertEquals(4, availablityListener.getNumNotifications());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 6bff0f618c0..d182f119360 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -26,6 +26,9 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link ProducerFailedException}.
+ */
 public class ProducerFailedExceptionTest {
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 817795ce7ea..57d2cd648ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -228,24 +228,20 @@ public void testConsumeSpilledPartition() throws Exception {
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		assertEquals(1, listener.getNumNotifications());
-
 		assertFalse(reader.nextBufferIsEvent()); // buffer
+
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // event
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // end of partition event
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
@@ -314,24 +310,20 @@ public void testConsumeSpilledPartitionSpilledBeforeAdd() throws Exception {
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		assertEquals(1, listener.getNumNotifications());
-
 		assertFalse(reader.nextBufferIsEvent()); // full buffer
+
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // full buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // event
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // partial buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // end of partition event
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
@@ -370,6 +362,7 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 		assertFalse(bufferConsumer.isRecycled());
 
 		assertFalse(reader.nextBufferIsEvent());
+
 		// first buffer (non-spilled)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
 		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
@@ -397,19 +390,19 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 		Buffer buffer = bufferConsumer.build();
 		buffer.retainBuffer();
 
-		assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
+		// second buffer (retained in SpillableSubpartition#nextBuffer)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
 		assertEquals(1, partition.getBuffersInBacklog());
 
 		bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
 
-		assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
+		// the event (spilled)
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
+		// last buffer (spilled)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
@@ -418,7 +411,6 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 		assertTrue(buffer.isRecycled());
 
 		// End of partition
-		assertTrue(reader.nextBufferIsEvent());
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 8c902157da9..9f5e6d0080f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -207,6 +207,8 @@ private static void assertNextBufferOrEvent(
 			assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
 			assertEquals("next is event", expectedNextBufferIsEvent,
 				bufferAndBacklog.nextBufferIsEvent());
+			assertEquals("next is event", expectedNextBufferIsEvent,
+				readView.nextBufferIsEvent());
 
 			assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
 		} finally {
@@ -215,7 +217,7 @@ private static void assertNextBufferOrEvent(
 		assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
 	}
 
-	protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
+	static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
 		assertNull(readView.getNextBuffer());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index d757aa9c0f5..2f5a013c56a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -31,6 +32,9 @@
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link InputChannel}.
+ */
 public class InputChannelTest {
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index a91473327d0..a67df0b41dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -36,6 +36,11 @@
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
+/**
+ * Input gate helper for unit tests.
+ *
+ * @param <T> type of the value to handle
+ */
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
 
 	private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1ecb67ff82c..2afd6d4aff9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -75,12 +75,15 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link LocalInputChannel}.
+ */
 public class LocalInputChannelTest {
 
 	/**
 	 * Tests the consumption of multiple subpartitions via local input channels.
 	 *
-	 * <p> Multiple producer tasks produce pipelined partitions, which are consumed by multiple
+	 * <p>Multiple producer tasks produce pipelined partitions, which are consumed by multiple
 	 * tasks via local input channels.
 	 */
 	@Test
@@ -266,20 +269,22 @@ public void testProducerFailedException() throws Exception {
 	 * Verifies that concurrent release via the SingleInputGate and re-triggering
 	 * of a partition request works smoothly.
 	 *
-	 * - SingleInputGate acquires its request lock and tries to release all
+	 * <ul>
+	 * <li>SingleInputGate acquires its request lock and tries to release all
 	 * registered channels. When releasing a channel, it needs to acquire
-	 * the channel's shared request-release lock.
-	 * - If a LocalInputChannel concurrently retriggers a partition request via
+	 * the channel's shared request-release lock.</li>
+	 * <li>If a LocalInputChannel concurrently retriggers a partition request via
 	 * a Timer Thread it acquires the channel's request-release lock and calls
 	 * the retrigger callback on the SingleInputGate, which again tries to
-	 * acquire the gate's request lock.
+	 * acquire the gate's request lock.</li>
+	 * </ul>
 	 *
-	 * For certain timings this obviously leads to a deadlock. This test reliably
+	 * <p>For certain timings this obviously leads to a deadlock. This test reliably
 	 * reproduced such a timing (reported in FLINK-5228). This test is pretty much
 	 * testing the buggy implementation and has not much more general value. If it
 	 * becomes obsolete at some point (future greatness ;)), feel free to remove it.
 	 *
-	 * The fix in the end was to to not acquire the channels lock when releasing it
+	 * <p>The fix in the end was to to not acquire the channels lock when releasing it
 	 * and/or not doing any input gate callbacks while holding the channel's lock.
 	 * I decided to do both.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9141b36d445..ec80459f0ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -336,9 +336,11 @@ public void testProducerFailedException() throws Exception {
 	 * Tests to verify the behaviours of three different processes if the number of available
 	 * buffers is less than required buffers.
 	 *
-	 * 1. Recycle the floating buffer
-	 * 2. Recycle the exclusive buffer
-	 * 3. Decrease the sender's backlog
+	 * <ol>
+	 * <li>Recycle the floating buffer</li>
+	 * <li>Recycle the exclusive buffer</li>
+	 * <li>Decrease the sender's backlog</li>
+	 * </ol>
 	 */
 	@Test
 	public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 71203276dd9..4bf5b220be8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -164,7 +164,7 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception {
 
 		final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
 		when(iterator.getNextBuffer()).thenReturn(
-			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false,0, false));
+			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false, 0, false));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager.createSubpartitionView(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index b0bafd505ec..33f570917b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 2e012253d94..96f01fd5667 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -32,13 +32,16 @@
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link UnionInputGate}.
+ */
 public class UnionInputGateTest {
 
 	/**
 	 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
 	 * value after receiving all end-of-partition events.
 	 *
-	 * <p> For buffer-or-event instances, it is important to verify that they have been set off to
+	 * <p>For buffer-or-event instances, it is important to verify that they have been set off to
 	 * the correct logical index.
 	 */
 	@Test(timeout = 120 * 1000)
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 33a92e3075f..51793e510ce 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -87,11 +87,11 @@ under the License.
 		files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<suppress
-		files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
 	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<!--Test class copied from the netty project-->
 	<suppress


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services