You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:45 UTC

[flink] branch master updated (e7ac3ba -> c15ba1c)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e7ac3ba  [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock
     new a5e1d40  [hotfix][checkstyle] Remove suppression for runtime/network.partition
     new bbb8b0a  [hotfix][network] ensure deserialization buffer capacity for the whole record length
     new 213e085  [hotfix][network] some minor improvements around the network stack
     new 51f53d8  [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer
     new 9d2b74b  [hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees
     new d9c49c1  [hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent()
     new b728253  [hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest
     new a9e5f70  [FLINK-10331][network] reduce unnecessary flushing
     new 72d522e  [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization
     new bda07fc  [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers
     new c15ba1c  [FLINK-10332][network] move data notification out of the synchronized block

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...SpillingAdaptiveSpanningRecordDeserializer.java |  12 +-
 .../runtime/io/network/buffer/BufferBuilder.java   |  25 +-
 .../runtime/io/network/buffer/BufferConsumer.java  |  23 +-
 .../runtime/io/network/buffer/NetworkBuffer.java   |   2 +-
 .../CreditBasedPartitionRequestClientHandler.java  |  14 +-
 .../netty/PartitionRequestClientHandler.java       |   7 +-
 .../io/network/netty/PartitionRequestQueue.java    |   7 +-
 .../partition/PartitionNotFoundException.java      |   3 +
 .../network/partition/PipelinedSubpartition.java   |  76 ++++--
 .../io/network/partition/ResultPartition.java      |  26 +-
 .../ResultPartitionConsumableNotifier.java         |   4 +-
 .../io/network/partition/ResultPartitionID.java    |   2 +-
 .../network/partition/ResultPartitionProvider.java |   3 +
 .../io/network/partition/ResultPartitionType.java  |   7 +-
 .../io/network/partition/ResultSubpartition.java   |  21 +-
 .../network/partition/ResultSubpartitionView.java  |   1 +
 .../network/partition/consumer/InputChannel.java   |  18 +-
 .../network/partition/consumer/InputChannelID.java |   3 +
 .../io/network/partition/consumer/InputGate.java   |   8 +-
 .../partition/consumer/InputGateMetrics.java       |   2 +-
 .../partition/consumer/LocalInputChannel.java      |   4 +-
 .../partition/consumer/UnknownInputChannel.java    |   6 +-
 .../network/partition/InputChannelTestUtils.java   |   4 +-
 .../network/partition/InputGateConcurrentTest.java |  29 +-
 .../network/partition/InputGateFairnessTest.java   |  16 +-
 .../LegacyPartialConsumePipelinedResultTest.java   |  13 +-
 .../PartialConsumePipelinedResultTest.java         |   7 +-
 .../partition/PipelinedSubpartitionTest.java       | 297 ++-------------------
 .../PipelinedSubpartitionWithReadViewTest.java     | 276 +++++++++++++++++++
 .../partition/ProducerFailedExceptionTest.java     |   3 +
 .../partition/SpillableSubpartitionTest.java       |  20 +-
 .../io/network/partition/SubpartitionTestBase.java |   4 +-
 .../partition/consumer/InputChannelTest.java       |   4 +
 .../IteratorWrappingTestSingleInputGate.java       |   5 +
 .../partition/consumer/LocalInputChannelTest.java  |  19 +-
 .../partition/consumer/RemoteInputChannelTest.java |   8 +-
 .../partition/consumer/SingleInputGateTest.java    |   2 +-
 .../partition/consumer/TestSingleInputGate.java    |   1 +
 .../partition/consumer/UnionInputGateTest.java     |   5 +-
 tools/maven/suppressions-runtime.xml               |   4 +-
 40 files changed, 542 insertions(+), 449 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java


[flink] 11/11: [FLINK-10332][network] move data notification out of the synchronized block

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c15ba1cd5a97ced3bb6411d588c3fb68df8a2869
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [FLINK-10332][network] move data notification out of the synchronized block
    
    None of the notifications actually rely on being under the lock and may thus
    only cause lock contention.
    
    This closes #6693.
---
 .../network/partition/PipelinedSubpartition.java   | 44 +++++++++++-----------
 1 file changed, 22 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb..fe27d97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
 		checkNotNull(bufferConsumer);
 
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
 				bufferConsumer.close();
@@ -98,14 +99,13 @@ class PipelinedSubpartition extends ResultSubpartition {
 			buffers.add(bufferConsumer);
 			updateStatistics(bufferConsumer);
 			increaseBuffersInBacklog(bufferConsumer);
+			notifyDataAvailable = shouldNotifyDataAvailable() || finish;
 
-			if (finish) {
-				isFinished = true;
-				notifyDataAvailable();
-			}
-			else {
-				maybeNotifyDataAvailable();
-			}
+			isFinished |= finish;
+		}
+
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 
 		return true;
@@ -220,6 +220,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			checkState(!isReleased);
 			checkState(readView == null,
@@ -230,9 +231,10 @@ class PipelinedSubpartition extends ResultSubpartition {
 				parent.getOwningTaskName(), index, parent.getPartitionId());
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
-			if (!buffers.isEmpty()) {
-				notifyDataAvailable();
-			}
+			notifyDataAvailable = !buffers.isEmpty();
+		}
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 
 		return readView;
@@ -283,26 +285,24 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public void flush() {
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			if (buffers.isEmpty()) {
 				return;
 			}
-			if (!flushRequested) {
-				flushRequested = true; // set this before the notification!
-				// if there is more then 1 buffer, we already notified the reader
-				// (at the latest when adding the second buffer)
-				if (buffers.size() == 1) {
-					notifyDataAvailable();
-				}
-			}
+			// if there is more then 1 buffer, we already notified the reader
+			// (at the latest when adding the second buffer)
+			notifyDataAvailable = !flushRequested && buffers.size() == 1;
+			flushRequested = true;
+		}
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 	}
 
-	private void maybeNotifyDataAvailable() {
+	private boolean shouldNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
-		if (getNumberOfFinishedBuffers() == 1) {
-			notifyDataAvailable();
-		}
+		return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1;
 	}
 
 	private void notifyDataAvailable() {


[flink] 08/11: [FLINK-10331][network] reduce unnecessary flushing

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9e5f70515a3cbad7e776b9248e841f6c81c9b7a
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 11:13:55 2018 +0200

    [FLINK-10331][network] reduce unnecessary flushing
    
    Do not flush (again) if
    - a previous flush request has not been completely handled yet and/or is still enqueued or
    - the network stack is still polling from this subpartition and doesn't need a new notification
    
    This closes #6692.
---
 .../network/partition/PipelinedSubpartition.java   | 46 ++++++++++++++++------
 .../partition/PipelinedSubpartitionTest.java       | 45 +++++++++++++++++++++
 2 files changed, 79 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 91e0d4b..d2d7fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -36,6 +36,19 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
+ *
+ * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second
+ * {@link BufferConsumer} (in which case we will assume the first one finished), we will
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via
+ * {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling
+ * {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and
+ * then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows
+ * no more buffers being available. This results in a buffer queue which is either empty or has an
+ * unfinished {@link BufferConsumer} left from which the notifications will eventually start again.
+ *
+ * <p>Explicit calls to {@link #flush()} will force this
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
+ * {@link BufferConsumer} present in the queue.
  */
 class PipelinedSubpartition extends ResultSubpartition {
 
@@ -67,17 +80,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public void flush() {
-		synchronized (buffers) {
-			if (buffers.isEmpty()) {
-				return;
-			}
-			flushRequested = !buffers.isEmpty();
-			notifyDataAvailable();
-		}
-	}
-
-	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
 		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
@@ -99,7 +101,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 			if (finish) {
 				isFinished = true;
-				flush();
+				notifyDataAvailable();
 			}
 			else {
 				maybeNotifyDataAvailable();
@@ -279,6 +281,23 @@ class PipelinedSubpartition extends ResultSubpartition {
 		return Math.max(buffers.size(), 0);
 	}
 
+	@Override
+	public void flush() {
+		synchronized (buffers) {
+			if (buffers.isEmpty()) {
+				return;
+			}
+			if (!flushRequested) {
+				flushRequested = true; // set this before the notification!
+				// if there is more then 1 buffer, we already notified the reader
+				// (at the latest when adding the second buffer)
+				if (buffers.size() == 1) {
+					notifyDataAvailable();
+				}
+			}
+		}
+	}
+
 	private void maybeNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
 		if (getNumberOfFinishedBuffers() == 1) {
@@ -295,6 +314,9 @@ class PipelinedSubpartition extends ResultSubpartition {
 	private int getNumberOfFinishedBuffers() {
 		assert Thread.holdsLock(buffers);
 
+		// NOTE: isFinished() is not guaranteed to provide the most up-to-date state here
+		// worst-case: a single finished buffer sits around until the next flush() call
+		// (but we do not offer stronger guarantees anyway)
 		if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
 			return 1;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 90bdb82..b75bb7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -47,6 +47,8 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -160,7 +162,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			subpartition.add(createFilledBufferConsumer(1025)); // finished
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 
+			assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
 			assertNextBuffer(readView, 1025, false, 1, false, true);
+			// not notified, but we could still access the unfinished buffer
+			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
 		} finally {
 			subpartition.release();
 		}
@@ -179,10 +185,49 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		try {
 			subpartition.add(createFilledBufferConsumer(1025)); // finished
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+			long oldNumNotifications = availablityListener.getNumNotifications();
 			subpartition.flush();
+			// buffer queue is > 1, should already be notified, no further notification necessary
+			assertThat(oldNumNotifications, greaterThan(0L));
+			assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
 
 			assertNextBuffer(readView, 1025, true, 1, false, true);
 			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
+		} finally {
+			subpartition.release();
+		}
+	}
+
+	/**
+	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+		final ResultSubpartition subpartition = createSubpartition();
+		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
+		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
+
+		try {
+			// no buffers -> no notification or any other effects
+			subpartition.flush();
+			assertEquals(0, availablityListener.getNumNotifications());
+
+			subpartition.add(createFilledBufferConsumer(1025)); // finished
+			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+			assertNextBuffer(readView, 1025, false, 1, false, true);
+
+			long oldNumNotifications = availablityListener.getNumNotifications();
+			subpartition.flush();
+			// buffer queue is 1 again -> need to flush
+			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+			subpartition.flush();
+			// calling again should not flush again
+			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
 		} finally {
 			subpartition.release();
 		}


[flink] 02/11: [hotfix][network] ensure deserialization buffer capacity for the whole record length

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbb8b0a10f4f48c47ffbeb325c7e3ce0203a552f
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:14:05 2018 +0200

    [hotfix][network] ensure deserialization buffer capacity for the whole record length
    
    Once we know the record length and if we are not spilling, we should size the
    buffer immediately to the expected record size, and not incrementally for each
    received buffer chunk.
---
 .../serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 41ee03d..196287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -485,7 +485,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			}
 			else {
 				// collect in memory
-				ensureBufferCapacity(numBytesChunk);
+				ensureBufferCapacity(nextRecordLength);
 				partial.segment.get(partial.position, buffer, 0, numBytesChunk);
 			}
 
@@ -515,6 +515,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					segmentRemaining -= toPut;
 					if (this.recordLength > THRESHOLD_FOR_SPILLING) {
 						this.spillingChannel = createSpillingChannel();
+					} else {
+						ensureBufferCapacity(this.recordLength);
 					}
 				}
 			}
@@ -527,9 +529,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				// spill to file
 				ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
 				this.spillingChannel.write(toWrite);
-			}
-			else {
-				ensureBufferCapacity(accumulatedRecordBytes + toCopy);
+			} else {
 				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
 			}
 


[flink] 04/11: [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51f53d8aeb7b9b4db8f3f4dde0c0f6ab6b626d2f
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:24:37 2018 +0200

    [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer
---
 .../runtime/io/network/buffer/BufferBuilder.java   | 25 +++++++++++-----------
 .../runtime/io/network/buffer/BufferConsumer.java  | 23 ++++++++++++++------
 2 files changed, 28 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f184..6fb067e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public class BufferBuilder {
 	 * @return number of written bytes.
 	 */
 	public int finish() {
-		positionMarker.markFinished();
+		int writtenBytes = positionMarker.markFinished();
 		commit();
-		return getWrittenBytes();
+		return writtenBytes;
 	}
 
 	public boolean isFinished() {
@@ -118,18 +118,10 @@ public class BufferBuilder {
 		return positionMarker.getCached() == getMaxCapacity();
 	}
 
-	public boolean isEmpty() {
-		return positionMarker.getCached() == 0;
-	}
-
 	public int getMaxCapacity() {
 		return memorySegment.size();
 	}
 
-	private int getWrittenBytes() {
-		return positionMarker.getCached();
-	}
-
 	/**
 	 * Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder}
 	 * has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer.
@@ -156,7 +148,7 @@ public class BufferBuilder {
 	 * Cached writing implementation of {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 *
 	 * <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible.
 	 */
@@ -181,12 +173,19 @@ public class BufferBuilder {
 			return PositionMarker.getAbsolute(cachedPosition);
 		}
 
-		public void markFinished() {
-			int newValue = -getCached();
+		/**
+		 * Marks this position as finished and returns the current position.
+		 *
+		 * @return current position as of {@link #getCached()}
+		 */
+		public int markFinished() {
+			int currentPosition = getCached();
+			int newValue = -currentPosition;
 			if (newValue == 0) {
 				newValue = FINISHED_EMPTY;
 			}
 			set(newValue);
+			return currentPosition;
 		}
 
 		public void move(int offset) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index f368ff0..3117fe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@ public class BufferConsumer implements Closeable {
 
 	private final CachedPositionMarker writerPosition;
 
-	private int currentReaderPosition = 0;
+	private int currentReaderPosition;
 
 	/**
 	 * Constructs {@link BufferConsumer} instance with content that can be changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ public class BufferConsumer implements Closeable {
 		this.currentReaderPosition = currentReaderPosition;
 	}
 
+	/**
+	 * Checks whether the {@link BufferBuilder} has already been finished.
+	 *
+	 * <p>BEWARE: this method accesses the cached value of the position marker which is only updated
+	 * after calls to {@link #build()}!
+	 *
+	 * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> otherwise
+	 */
 	public boolean isFinished() {
 		return writerPosition.isFinished();
 	}
@@ -84,16 +92,17 @@ public class BufferConsumer implements Closeable {
 	 */
 	public Buffer build() {
 		writerPosition.update();
-		Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition);
-		currentReaderPosition = writerPosition.getCached();
+		int cachedWriterPosition = writerPosition.getCached();
+		Buffer slice = buffer.readOnlySlice(currentReaderPosition, cachedWriterPosition - currentReaderPosition);
+		currentReaderPosition = cachedWriterPosition;
 		return slice.retainBuffer();
 	}
 
 	/**
 	 * Returns a retained copy with separate indexes. This allows to read from the same {@link MemorySegment} twice.
 	 *
-	 * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In
-	 * other words, data already consumed before copying will not be visible to the returned copies.
+	 * <p>WARNING: the newly returned {@link BufferConsumer} will have its reader index copied from the original buffer.
+	 * In other words, data already consumed before copying will not be visible to the returned copies.
 	 *
 	 * @return a retained copy of self with separate indexes
 	 */
@@ -124,7 +133,7 @@ public class BufferConsumer implements Closeable {
 	 * Cached reading wrapper around {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 */
 	private static class CachedPositionMarker {
 		private final PositionMarker positionMarker;
@@ -134,7 +143,7 @@ public class BufferConsumer implements Closeable {
 		 */
 		private int cachedPosition;
 
-		public CachedPositionMarker(PositionMarker positionMarker) {
+		CachedPositionMarker(PositionMarker positionMarker) {
 			this.positionMarker = checkNotNull(positionMarker);
 			update();
 		}


[flink] 05/11: [hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d2b74bf502cac3274cd6f2dc72db56c056ff92b
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:25:24 2018 +0200

    [hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees
    
    - producers should flush after writing to make sure all data has been sent
    - we can only check bufferConsumer.isFinished() after building a Buffer
    - producer/consumer threads should be named
---
 .../network/partition/InputGateConcurrentTest.java | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 5f5728d..5c643af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -198,6 +199,8 @@ public class InputGateConcurrentTest {
 	private abstract static class Source {
 
 		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
+
+		abstract void flush();
 	}
 
 	private static class PipelinedSubpartitionSource extends Source {
@@ -212,6 +215,11 @@ public class InputGateConcurrentTest {
 		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
 			partition.add(bufferConsumer);
 		}
+
+		@Override
+		void flush() {
+			partition.flush();
+		}
 	}
 
 	private static class RemoteChannelSource extends Source {
@@ -225,14 +233,19 @@ public class InputGateConcurrentTest {
 
 		@Override
 		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
 			try {
-				channel.onBuffer(bufferConsumer.build(), seq++, -1);
+				Buffer buffer = bufferConsumer.build();
+				checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
+				channel.onBuffer(buffer, seq++, -1);
 			}
 			finally {
 				bufferConsumer.close();
 			}
 		}
+
+		@Override
+		void flush() {
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -248,6 +261,7 @@ public class InputGateConcurrentTest {
 		private final int yieldAfter;
 
 		ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) {
+			super("producer");
 			this.sources = sources;
 			this.numTotal = numTotal;
 			this.maxChunk = maxChunk;
@@ -276,7 +290,10 @@ public class InputGateConcurrentTest {
 					//noinspection CallToThreadYield
 					Thread.yield();
 				}
+			}
 
+			for (Source source : sources) {
+				source.flush();
 			}
 		}
 	}
@@ -287,6 +304,7 @@ public class InputGateConcurrentTest {
 		private final int numBuffers;
 
 		ConsumerThread(SingleInputGate gate, int numBuffers) {
+			super("consumer");
 			this.gate = gate;
 			this.numBuffers = numBuffers;
 		}


[flink] 07/11: [hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b728253a7af6b51f6b749016cacc6e2e60f94cc8
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 18:47:25 2018 +0200

    [hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest
---
 .../partition/PipelinedSubpartitionTest.java       | 53 +++++++---------------
 1 file changed, 16 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index fc9a643..90bdb82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
@@ -254,7 +253,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		// Empty => should return null
 		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
+		assertNoNextBuffer(view);
 		assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
 		verify(listener, times(0)).notifyDataAvailable();
 
@@ -270,16 +269,10 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		verify(listener, times(1)).notifyDataAvailable();
 
 		// ...and one available result
-		assertFalse(view.nextBufferIsEvent());
-		BufferAndBacklog read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
+		assertNoNextBuffer(view);
 		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// Add data to the queue...
@@ -291,21 +284,15 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		verify(listener, times(2)).notifyDataAvailable();
 
-		assertFalse(view.nextBufferIsEvent());
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
 		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
+		assertNoNextBuffer(view);
 		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// some tests with events
 
-		// fill with: buffer, event , and buffer
+		// fill with: buffer, event, and buffer
 		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
 		assertFalse(view.nextBufferIsEvent());
 		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
@@ -318,32 +305,24 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		verify(listener, times(4)).notifyDataAvailable();
 
-		assertFalse(view.nextBufferIsEvent()); // the first buffer
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		// the first buffer
+		assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
 		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertTrue(read.nextBufferIsEvent());
 
-		assertTrue(view.nextBufferIsEvent()); // the event
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertFalse(read.buffer().isBuffer());
+		// the event
+		assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
 		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
 
-		assertFalse(view.nextBufferIsEvent()); // the remaining buffer
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		// the remaining buffer
+		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
 		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
+
+		// nothing more
+		assertNoNextBuffer(view);
+		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		assertEquals(5, subpartition.getTotalNumberOfBuffers());
 		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());


[flink] 03/11: [hotfix][network] some minor improvements around the network stack

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 213e085cc75174e458a35e669059aa142e971d6b
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:23:33 2018 +0200

    [hotfix][network] some minor improvements around the network stack
---
 .../api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 4 ++--
 .../org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java     | 2 +-
 .../io/network/netty/CreditBasedPartitionRequestClientHandler.java    | 3 +--
 .../flink/runtime/io/network/netty/PartitionRequestClientHandler.java | 3 +--
 4 files changed, 5 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 196287b..8630ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -502,8 +502,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			int segmentRemaining = numBytes;
 			// check where to go. if we have a partial length, we need to complete it first
 			if (this.lengthBuffer.position() > 0) {
-				int toPut = Math.min(this.lengthBuffer.remaining(), numBytes);
-				segment.get(offset, this.lengthBuffer, toPut);
+				int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+				segment.get(segmentPosition, this.lengthBuffer, toPut);
 				// did we complete the length?
 				if (this.lengthBuffer.hasRemaining()) {
 					return;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 489be39..05b7582 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -93,7 +93,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 
 	/**
 	 * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
-	 * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+	 * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>.
 	 *
 	 * @param memorySegment
 	 * 		backing memory segment (defines {@link #maxCapacity})
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 9aa3920..90daf75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -327,8 +327,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
-				buffer.setSize(receivedSize);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 367c62d..796e86f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -337,8 +337,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
 				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
-				buffer.setSize(receivedSize);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
 


[flink] 06/11: [hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent()

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d9c49c17491a0a18eb267fa476c98e5359a560a6
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 18:44:49 2018 +0200

    [hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent()
---
 .../network/partition/SpillableSubpartitionTest.java | 20 ++++++--------------
 .../io/network/partition/SubpartitionTestBase.java   |  2 ++
 2 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 817795c..57d2cd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -228,24 +228,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		assertEquals(1, listener.getNumNotifications());
-
 		assertFalse(reader.nextBufferIsEvent()); // buffer
+
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // event
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // end of partition event
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
@@ -314,24 +310,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		assertEquals(1, listener.getNumNotifications());
-
 		assertFalse(reader.nextBufferIsEvent()); // full buffer
+
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // full buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // event
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // partial buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // end of partition event
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
@@ -370,6 +362,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertFalse(bufferConsumer.isRecycled());
 
 		assertFalse(reader.nextBufferIsEvent());
+
 		// first buffer (non-spilled)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
 		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
@@ -397,19 +390,19 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		Buffer buffer = bufferConsumer.build();
 		buffer.retainBuffer();
 
-		assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
+		// second buffer (retained in SpillableSubpartition#nextBuffer)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
 		assertEquals(1, partition.getBuffersInBacklog());
 
 		bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
 
-		assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
+		// the event (spilled)
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
+		// last buffer (spilled)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
@@ -418,7 +411,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertTrue(buffer.isRecycled());
 
 		// End of partition
-		assertTrue(reader.nextBufferIsEvent());
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 8c90215..5989cf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -207,6 +207,8 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
 			assertEquals("next is event", expectedNextBufferIsEvent,
 				bufferAndBacklog.nextBufferIsEvent());
+			assertEquals("next is event", expectedNextBufferIsEvent,
+				readView.nextBufferIsEvent());
 
 			assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
 		} finally {


[flink] 09/11: [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72d522efeef956cadeb8fe53778985855f8ee738
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization
    
    - add PipelinedSubpartitionWithReadViewTest which always creates a subpartition,
    an availability listener, and a read view before each test and cleans up after
    each test
    - remove mockito use from testBasicPipelinedProduceConsumeLogic()
---
 .../partition/PipelinedSubpartitionTest.java       | 314 +--------------------
 .../PipelinedSubpartitionWithReadViewTest.java     | 276 ++++++++++++++++++
 .../io/network/partition/SubpartitionTestBase.java |   2 +-
 3 files changed, 292 insertions(+), 300 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index b75bb7a..82f61ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -40,19 +40,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -62,6 +55,8 @@ import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link PipelinedSubpartition}.
+ *
+ * @see PipelinedSubpartitionWithReadViewTest
  */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -80,189 +75,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		return new PipelinedSubpartition(0, parent);
 	}
 
-	@Test(expected = IllegalStateException.class)
-	public void testAddTwoNonFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			subpartition.add(createBufferBuilder().createBufferConsumer());
-			subpartition.add(createBufferBuilder().createBufferConsumer());
-			assertNull(readView.getNextBuffer());
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testAddEmptyNonFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			BufferBuilder bufferBuilder = createBufferBuilder();
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			assertEquals(0, availablityListener.getNumNotifications());
-			assertNull(readView.getNextBuffer());
-
-			bufferBuilder.finish();
-			bufferBuilder = createBufferBuilder();
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
-			assertNull(readView.getNextBuffer());
-			assertEquals(1, subpartition.getBuffersInBacklog());
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testAddNonEmptyNotFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			BufferBuilder bufferBuilder = createBufferBuilder();
-			bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			// note that since the buffer builder is not finished, there is still a retained instance!
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertEquals(1, subpartition.getBuffersInBacklog());
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
-	 * busy loop on the unfinished BufferConsumers.
-	 */
-	@Test
-	public void testUnfinishedBufferBehindFinished() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
-			assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
-			assertNextBuffer(readView, 1025, false, 1, false, true);
-			// not notified, but we could still access the unfinished buffer
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertNoNextBuffer(readView);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
-	 * of the data.
-	 */
-	@Test
-	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-			long oldNumNotifications = availablityListener.getNumNotifications();
-			subpartition.flush();
-			// buffer queue is > 1, should already be notified, no further notification necessary
-			assertThat(oldNumNotifications, greaterThan(0L));
-			assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1025, true, 1, false, true);
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertNoNextBuffer(readView);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
-	 */
-	@Test
-	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			// no buffers -> no notification or any other effects
-			subpartition.flush();
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
-			assertNextBuffer(readView, 1025, false, 1, false, true);
-
-			long oldNumNotifications = availablityListener.getNumNotifications();
-			subpartition.flush();
-			// buffer queue is 1 again -> need to flush
-			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
-			subpartition.flush();
-			// calling again should not flush again
-			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertNoNextBuffer(readView);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testMultipleEmptyBuffers() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(0));
-
-			assertEquals(1, availablityListener.getNumNotifications());
-			subpartition.add(createFilledBufferConsumer(0));
-			assertEquals(2, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(0));
-			assertEquals(2, availablityListener.getNumNotifications());
-			assertEquals(3, subpartition.getBuffersInBacklog());
-
-			subpartition.add(createFilledBufferConsumer(1024));
-			assertEquals(2, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1024, false, 0, false, true);
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
 	@Test
 	public void testIllegalReadViewRequest() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
@@ -278,100 +90,23 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		}
 	}
 
+	/**
+	 * Verifies that the isReleased() check of the view checks the parent
+	 * subpartition.
+	 */
 	@Test
-	public void testEmptyFlush() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
+	public void testIsReleasedChecksParent() {
+		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
 
-		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		subpartition.createReadView(listener);
-		subpartition.flush();
-		assertEquals(0, listener.getNumNotifications());
-	}
+		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
+			subpartition, mock(BufferAvailabilityListener.class));
 
-	@Test
-	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
+		assertFalse(reader.isReleased());
+		verify(subpartition, times(1)).isReleased();
 
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-
-		ResultSubpartitionView view = subpartition.createReadView(listener);
-
-		// Empty => should return null
-		assertFalse(view.nextBufferIsEvent());
-		assertNoNextBuffer(view);
-		assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
-		verify(listener, times(0)).notifyDataAvailable();
-
-		// Add data to the queue...
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(1, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-
-		// ...should have resulted in a notification
-		verify(listener, times(1)).notifyDataAvailable();
-
-		// ...and one available result
-		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
-		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertNoNextBuffer(view);
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// Add data to the queue...
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(2, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		verify(listener, times(2)).notifyDataAvailable();
-
-		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
-		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertNoNextBuffer(view);
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// some tests with events
-
-		// fill with: buffer, event, and buffer
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
-		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		verify(listener, times(4)).notifyDataAvailable();
-
-		// the first buffer
-		assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
-		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
-
-		// the event
-		assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
-		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
-
-		// the remaining buffer
-		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
-		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// nothing more
-		assertNoNextBuffer(view);
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
-		verify(listener, times(4)).notifyDataAvailable();
+		when(subpartition.isReleased()).thenReturn(true);
+		assertTrue(reader.isReleased());
+		verify(subpartition, times(2)).isReleased();
 	}
 
 	@Test
@@ -394,25 +129,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		testProduceConsume(true, true);
 	}
 
-	/**
-	 * Verifies that the isReleased() check of the view checks the parent
-	 * subpartition.
-	 */
-	@Test
-	public void testIsReleasedChecksParent() throws Exception {
-		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
-
-		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
-				subpartition, mock(BufferAvailabilityListener.class));
-
-		assertFalse(reader.isReleased());
-		verify(subpartition, times(1)).isReleased();
-
-		when(subpartition.isReleased()).thenReturn(true);
-		assertTrue(reader.isReleased());
-		verify(subpartition, times(2)).isReleased();
-	}
-
 	private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
 		// Config
 		final int producerBufferPoolSize = 8;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
new file mode 100644
index 0000000..6f9920e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for {@link PipelinedSubpartition} which require an availability listener and a
+ * read view.
+ *
+ * @see PipelinedSubpartitionTest
+ */
+public class PipelinedSubpartitionWithReadViewTest {
+
+	private PipelinedSubpartition subpartition;
+	private AwaitableBufferAvailablityListener availablityListener;
+	private PipelinedSubpartitionView readView;
+
+	@Before
+	public void setup() throws IOException {
+		final ResultPartition parent = mock(ResultPartition.class);
+		subpartition = new PipelinedSubpartition(0, parent);
+		availablityListener = new AwaitableBufferAvailablityListener();
+		readView = subpartition.createReadView(availablityListener);
+	}
+
+	@After
+	public void tearDown() {
+		readView.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testAddTwoNonFinishedBuffer() {
+		subpartition.add(createBufferBuilder().createBufferConsumer());
+		subpartition.add(createBufferBuilder().createBufferConsumer());
+		assertNull(readView.getNextBuffer());
+	}
+
+	@Test
+	public void testAddEmptyNonFinishedBuffer() {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		assertEquals(0, availablityListener.getNumNotifications());
+		assertNull(readView.getNextBuffer());
+
+		bufferBuilder.finish();
+		bufferBuilder = createBufferBuilder();
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
+		assertNull(readView.getNextBuffer());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+	}
+
+	@Test
+	public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		// note that since the buffer builder is not finished, there is still a retained instance!
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertEquals(1, subpartition.getBuffersInBacklog());
+	}
+
+	/**
+	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
+	 * busy loop on the unfinished BufferConsumers.
+	 */
+	@Test
+	public void testUnfinishedBufferBehindFinished() throws Exception {
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+		assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
+		assertNextBuffer(readView, 1025, false, 1, false, true);
+		// not notified, but we could still access the unfinished buffer
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	/**
+	 * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
+	 * of the data.
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+		long oldNumNotifications = availablityListener.getNumNotifications();
+		subpartition.flush();
+		// buffer queue is > 1, should already be notified, no further notification necessary
+		assertThat(oldNumNotifications, greaterThan(0L));
+		assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1025, true, 1, false, true);
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	/**
+	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+		// no buffers -> no notification or any other effects
+		subpartition.flush();
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+		assertNextBuffer(readView, 1025, false, 1, false, true);
+
+		long oldNumNotifications = availablityListener.getNumNotifications();
+		subpartition.flush();
+		// buffer queue is 1 again -> need to flush
+		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+		subpartition.flush();
+		// calling again should not flush again
+		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	@Test
+	public void testMultipleEmptyBuffers() throws Exception {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(0));
+
+		assertEquals(1, availablityListener.getNumNotifications());
+		subpartition.add(createFilledBufferConsumer(0));
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(0));
+		assertEquals(2, availablityListener.getNumNotifications());
+		assertEquals(3, subpartition.getBuffersInBacklog());
+
+		subpartition.add(createFilledBufferConsumer(1024));
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1024, false, 0, false, true);
+	}
+
+	@Test
+	public void testEmptyFlush()  {
+		subpartition.flush();
+		assertEquals(0, availablityListener.getNumNotifications());
+	}
+
+	@Test
+	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
+		// Empty => should return null
+		assertFalse(readView.nextBufferIsEvent());
+		assertNoNextBuffer(readView);
+		assertFalse(readView.nextBufferIsEvent()); // also after getNextBuffer()
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		// Add data to the queue...
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(1, subpartition.getTotalNumberOfBuffers());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+
+		// ...should have resulted in a notification
+		assertEquals(1, availablityListener.getNumNotifications());
+
+		// ...and one available result
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// Add data to the queue...
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(2, subpartition.getTotalNumberOfBuffers());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// some tests with events
+
+		// fill with: buffer, event, and buffer
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(5, subpartition.getTotalNumberOfBuffers());
+		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(4, availablityListener.getNumNotifications());
+
+		// the first buffer
+		assertNextBuffer(readView, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
+		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
+		// the event
+		assertNextEvent(readView, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
+		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
+		// the remaining buffer
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// nothing more
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		assertEquals(5, subpartition.getTotalNumberOfBuffers());
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+		assertEquals(4, availablityListener.getNumNotifications());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 5989cf8..9f5e6d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -217,7 +217,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
 	}
 
-	protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
+	static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
 		assertNull(readView.getNextBuffer());
 	}
 }


[flink] 10/11: [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bda07fc1f709f20ce5c3d52df36944e21b869b52
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:02 2018 +0200

    [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers
---
 .../netty/CreditBasedPartitionRequestClientHandler.java       | 11 ++---------
 .../io/network/netty/PartitionRequestClientHandler.java       |  4 +---
 .../flink/runtime/io/network/netty/PartitionRequestQueue.java |  7 +------
 3 files changed, 4 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75..cc0b222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 	public void addInputChannel(RemoteInputChannel listener) throws IOException {
 		checkError();
 
-		if (!inputChannels.containsKey(listener.getInputChannelId())) {
-			inputChannels.put(listener.getInputChannelId(), listener);
-		}
+		inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
 	}
 
 	@Override
@@ -112,12 +110,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 
 	@Override
 	public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
-		ctx.executor().execute(new Runnable() {
-			@Override
-			public void run() {
-				ctx.pipeline().fireUserEventTriggered(inputChannel);
-			}
-		});
+		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f..c5ba7a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
 	public void addInputChannel(RemoteInputChannel listener) throws IOException {
 		checkError();
 
-		if (!inputChannels.containsKey(listener.getInputChannelId())) {
-			inputChannels.put(listener.getInputChannelId(), listener);
-		}
+		inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b82..c3d3d1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		// TODO This could potentially have a bad performance impact as in the
 		// worst case (network consumes faster than the producer) each buffer
 		// will trigger a separate event loop task being scheduled.
-		ctx.executor().execute(new Runnable() {
-			@Override
-			public void run() {
-				ctx.pipeline().fireUserEventTriggered(reader);
-			}
-		});
+		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
 	}
 
 	/**


[flink] 01/11: [hotfix][checkstyle] Remove suppression for runtime/network.partition

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a5e1d407c6f8694f0b61febc651a603b175c0fe7
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 11:29:15 2018 +0200

    [hotfix][checkstyle] Remove suppression for runtime/network.partition
---
 .../partition/PartitionNotFoundException.java      |  3 +++
 .../network/partition/PipelinedSubpartition.java   |  6 ++---
 .../io/network/partition/ResultPartition.java      | 26 +++++++++++-----------
 .../ResultPartitionConsumableNotifier.java         |  4 +++-
 .../io/network/partition/ResultPartitionID.java    |  2 +-
 .../network/partition/ResultPartitionProvider.java |  3 +++
 .../io/network/partition/ResultPartitionType.java  |  7 ++++--
 .../io/network/partition/ResultSubpartition.java   | 21 +++++++++--------
 .../network/partition/ResultSubpartitionView.java  |  1 +
 .../network/partition/consumer/InputChannel.java   | 18 +++++++--------
 .../network/partition/consumer/InputChannelID.java |  3 +++
 .../io/network/partition/consumer/InputGate.java   |  8 +++----
 .../partition/consumer/InputGateMetrics.java       |  2 +-
 .../partition/consumer/LocalInputChannel.java      |  4 ++--
 .../partition/consumer/UnknownInputChannel.java    |  6 ++---
 .../network/partition/InputChannelTestUtils.java   |  4 ++--
 .../network/partition/InputGateConcurrentTest.java |  7 ++++--
 .../network/partition/InputGateFairnessTest.java   | 16 +++++++------
 .../LegacyPartialConsumePipelinedResultTest.java   | 13 ++++++-----
 .../PartialConsumePipelinedResultTest.java         |  7 ++++--
 .../partition/PipelinedSubpartitionTest.java       |  7 ++++--
 .../partition/ProducerFailedExceptionTest.java     |  3 +++
 .../partition/consumer/InputChannelTest.java       |  4 ++++
 .../IteratorWrappingTestSingleInputGate.java       |  5 +++++
 .../partition/consumer/LocalInputChannelTest.java  | 19 ++++++++++------
 .../partition/consumer/RemoteInputChannelTest.java |  8 ++++---
 .../partition/consumer/SingleInputGateTest.java    |  2 +-
 .../partition/consumer/TestSingleInputGate.java    |  1 +
 .../partition/consumer/UnionInputGateTest.java     |  5 ++++-
 tools/maven/suppressions-runtime.xml               |  4 ++--
 30 files changed, 135 insertions(+), 84 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 7479862..2f78816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
 
 import java.io.IOException;
 
+/**
+ * Exception for failed partition requests due to non-existing partitions.
+ */
 public class PartitionNotFoundException extends IOException {
 
 	private static final long serialVersionUID = 0L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c6f3e15..91e0d4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -188,17 +188,17 @@ class PipelinedSubpartition extends ResultSubpartition {
 				buffer,
 				isAvailableUnsafe(),
 				getBuffersInBacklog(),
-				_nextBufferIsEvent());
+				nextBufferIsEventUnsafe());
 		}
 	}
 
 	boolean nextBufferIsEvent() {
 		synchronized (buffers) {
-			return _nextBufferIsEvent();
+			return nextBufferIsEventUnsafe();
 		}
 	}
 
-	private boolean _nextBufferIsEvent() {
+	private boolean nextBufferIsEventUnsafe() {
 		assert Thread.holdsLock(buffers);
 
 		return !buffers.isEmpty() && !buffers.peekFirst().isBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 93e5ba1..b32f73f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -48,17 +48,17 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A result partition for data produced by a single task.
  *
- * <p> This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
+ * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
  * or more {@link ResultSubpartition} instances, which further partition the data depending on the
  * number of consuming tasks and the data {@link DistributionPattern}.
  *
- * <p> Tasks, which consume a result partition have to request one of its subpartitions. The request
+ * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
  *
  * <h2>Life-cycle</h2>
  *
- * The life-cycle of each result partition has three (possibly overlapping) phases:
+ * <p>The life-cycle of each result partition has three (possibly overlapping) phases:
  * <ol>
  * <li><strong>Produce</strong>: </li>
  * <li><strong>Consume</strong>: </li>
@@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <h2>Lazy deployment and updates of consuming tasks</h2>
  *
- * Before a consuming task can request the result, it has to be deployed. The time of deployment
+ * <p>Before a consuming task can request the result, it has to be deployed. The time of deployment
  * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined
  * results, receivers are deployed as soon as the first buffer is added to the result partition.
  * With blocking results on the other hand, receivers are deployed after the partition is finished.
@@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
-	
+
 	private final String owningTaskName;
 
 	private final TaskActions taskActions;
@@ -174,10 +174,10 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	/**
 	 * Registers a buffer pool with this result partition.
-	 * <p>
-	 * There is one pool for each result partition, which is shared by all its sub partitions.
-	 * <p>
-	 * The pool is registered with the partition *after* it as been constructed in order to conform
+	 *
+	 * <p>There is one pool for each result partition, which is shared by all its sub partitions.
+	 *
+	 * <p>The pool is registered with the partition *after* it as been constructed in order to conform
 	 * to the life-cycle of task registrations in the {@link TaskManager}.
 	 */
 	public void registerBufferPool(BufferPool bufferPool) {
@@ -276,9 +276,9 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	/**
 	 * Finishes the result partition.
 	 *
-	 * <p> After this operation, it is not possible to add further data to the result partition.
+	 * <p>After this operation, it is not possible to add further data to the result partition.
 	 *
-	 * <p> For BLOCKING results, this will trigger the deployment of consuming tasks.
+	 * <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
 	 */
 	public void finish() throws IOException {
 		boolean success = false;
@@ -366,7 +366,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	/**
 	 * Releases buffers held by this result partition.
 	 *
-	 * <p> This is a callback from the buffer pool, which is registered for result partitions, which
+	 * <p>This is a callback from the buffer pool, which is registered for result partitions, which
 	 * are back pressure-free.
 	 */
 	@Override
@@ -395,7 +395,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	/**
 	 * Pins the result partition.
 	 *
-	 * <p> The partition can only be released after each subpartition has been consumed once per pin
+	 * <p>The partition can only be released after each subpartition has been consumed once per pin
 	 * operation.
 	 */
 	void pin() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 02212ce..10eb086 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
+/**
+ * Interface for notifications about consumable partitions.
+ */
 public interface ResultPartitionConsumableNotifier {
 	void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index b84c33b..cee79a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 /**
  * Runtime identifier of a produced {@link IntermediateResultPartition}.
  *
- * <p> In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
+ * <p>In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
  * identify a result partition. It needs to be associated with the producing task as well to ensure
  * correct tracking of failed/restarted tasks.
  */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index db72d63..faeaaf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
 
 import java.io.IOException;
 
+/**
+ * Interface for creating result partitions.
+ */
 public interface ResultPartitionProvider {
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 256387c..f62dbeeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+/**
+ * Type of a result partition.
+ */
 public enum ResultPartitionType {
 
 	BLOCKING(false, false, false),
@@ -27,12 +30,12 @@ public enum ResultPartitionType {
 	/**
 	 * Pipelined partitions with a bounded (local) buffer pool.
 	 *
-	 * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+	 * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
 	 * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
 	 * overall network buffer pool size, this, however, still allows to be flexible with regards
 	 * to the total number of partitions by selecting an appropriately big network buffer pool size.
 	 *
-	 * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+	 * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
 	 * no checkpoint barriers.
 	 */
 	PIPELINED_BOUNDED(true, true, true);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index adc0ed3..58a1402 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -43,16 +43,16 @@ public abstract class ResultSubpartition {
 	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
 	protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
 
-	/** The number of non-event buffers currently in this subpartition */
+	/** The number of non-event buffers currently in this subpartition. */
 	@GuardedBy("buffers")
 	private int buffersInBacklog;
 
 	// - Statistics ----------------------------------------------------------
 
-	/** The total number of buffers (both data and event buffers) */
+	/** The total number of buffers (both data and event buffers). */
 	private long totalNumberOfBuffers;
 
-	/** The total number of bytes (both data and event buffers) */
+	/** The total number of bytes (both data and event buffers). */
 	private long totalNumberOfBytes;
 
 	public ResultSubpartition(int index, ResultPartition parent) {
@@ -102,19 +102,19 @@ public abstract class ResultSubpartition {
 	 * @throws IOException
 	 * 		thrown in case of errors while adding the buffer
 	 */
-	abstract public boolean add(BufferConsumer bufferConsumer) throws IOException;
+	public abstract boolean add(BufferConsumer bufferConsumer) throws IOException;
 
-	abstract public void flush();
+	public abstract void flush();
 
-	abstract public void finish() throws IOException;
+	public abstract void finish() throws IOException;
 
-	abstract public void release() throws IOException;
+	public abstract void release() throws IOException;
 
-	abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
+	public abstract ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
 
 	abstract int releaseMemory() throws IOException;
 
-	abstract public boolean isReleased();
+	public abstract boolean isReleased();
 
 	/**
 	 * Gets the number of non-event buffers in this subpartition.
@@ -132,7 +132,7 @@ public abstract class ResultSubpartition {
 	 * This method must not acquire locks or interfere with the task and network threads in
 	 * any way.
 	 */
-	abstract public int unsynchronizedGetNumberOfQueuedBuffers();
+	public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
 	/**
 	 * Decreases the number of non-event buffers by one after fetching a non-event
@@ -198,7 +198,6 @@ public abstract class ResultSubpartition {
 			return buffersInBacklog;
 		}
 
-
 		public boolean nextBufferIsEvent() {
 			return nextBufferIsEvent;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index b1ccd63..a755955 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 2a7cedf..a08ecc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -34,8 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An input channel consumes a single {@link ResultSubpartitionView}.
- * <p>
- * For each channel, the consumption life cycle is as follows:
+ *
+ * <p>For each channel, the consumption life cycle is as follows:
  * <ol>
  * <li>{@link #requestSubpartition(int)}</li>
  * <li>{@link #getNextBuffer()}</li>
@@ -66,7 +66,7 @@ public abstract class InputChannel {
 
 	protected final Counter numBuffersIn;
 
-	/** The current backoff (in ms) */
+	/** The current backoff (in ms). */
 	private int currentBackoff;
 
 	protected InputChannel(
@@ -111,12 +111,12 @@ public abstract class InputChannel {
 
 	/**
 	 * Notifies the owning {@link SingleInputGate} that this channel became non-empty.
-	 * 
+	 *
 	 * <p>This is guaranteed to be called only when a Buffer was added to a previously
 	 * empty input channel. The notion of empty is atomically consistent with the flag
 	 * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer
 	 * from this channel.
-	 * 
+	 *
 	 * <p><b>Note:</b> When the input channel observes an exception, this
 	 * method is called regardless of whether the channel was empty before. That ensures
 	 * that the parent InputGate will always be notified about the exception.
@@ -132,8 +132,8 @@ public abstract class InputChannel {
 	/**
 	 * Requests the queue with the specified index of the source intermediate
 	 * result partition.
-	 * <p>
-	 * The queue index to request depends on which sub task the channel belongs
+	 *
+	 * <p>The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
 	abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
@@ -149,8 +149,8 @@ public abstract class InputChannel {
 
 	/**
 	 * Sends a {@link TaskEvent} back to the task producing the consumed result partition.
-	 * <p>
-	 * <strong>Important</strong>: The producing task has to be running to receive backwards events.
+	 *
+	 * <p><strong>Important</strong>: The producing task has to be running to receive backwards events.
 	 * This means that the result type needs to be pipelined and the task logic has to ensure that
 	 * the producer will wait for all backwards events. Otherwise, this will lead to an Exception
 	 * at runtime.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
index ceeb83d..c1886de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
@@ -22,6 +22,9 @@ import org.apache.flink.util.AbstractID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+/**
+ * Identifier for input channels.
+ */
 public class InputChannelID extends AbstractID {
 
 	private static final long serialVersionUID = 1L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c78abb5..6e59f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -26,10 +26,10 @@ import java.util.Optional;
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator produces data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -38,7 +38,7 @@ import java.util.Optional;
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
  * subpartitions.
  *
@@ -59,7 +59,7 @@ import java.util.Optional;
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
  * subpartitions -- one for each parallel reduce subtask. As shown in the Figure, each reduce task
  * will have an input gate attached to it. This will provide its input, which will consist of one
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 69af455..ebb8b9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -40,7 +40,7 @@ public class InputGateMetrics {
 
 	// ------------------------------------------------------------------------
 
-	// these methods are package private to make access from the nested classes faster 
+	// these methods are package private to make access from the nested classes faster
 
 	/**
 	 * Iterates over all input channels and collects the total number of queued buffers in a
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4b3a8ff..e7986bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -57,7 +57,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	/** Task event dispatcher for backwards events. */
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	/** The consumed subpartition */
+	/** The consumed subpartition. */
 	private volatile ResultSubpartitionView subpartitionView;
 
 	private volatile boolean isReleased;
@@ -245,7 +245,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	}
 
 	/**
-	 * Releases the partition reader
+	 * Releases the partition reader.
 	 */
 	@Override
 	void releaseAllResources() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 20a7aed..c0e9177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -89,8 +89,8 @@ class UnknownInputChannel extends InputChannel {
 
 	/**
 	 * Returns <code>false</code>.
-	 * <p>
-	 * <strong>Important</strong>: It is important that the method correctly
+	 *
+	 * <p><strong>Important</strong>: It is important that the method correctly
 	 * always <code>false</code> for unknown input channels in order to not
 	 * finish the consumption of an intermediate result partition early.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index f73ede7..f7db40b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -59,7 +59,7 @@ public class InputChannelTestUtils {
 
 		return manager;
 	}
-	
+
 	public static ConnectionManager createDummyConnectionManager() throws Exception {
 		final PartitionRequestClient mockClient = mock(PartitionRequestClient.class);
 
@@ -71,6 +71,6 @@ public class InputChannelTestUtils {
 
 	// ------------------------------------------------------------------------
 
-	/** This class is not meant to be instantiated */
+	/** This class is not meant to be instantiated. */
 	private InputChannelTestUtils() {}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 73f3cfb..5f5728d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -46,6 +46,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Concurrency tests for input gates.
+ */
 public class InputGateConcurrentTest {
 
 	@Test
@@ -192,8 +195,8 @@ public class InputGateConcurrentTest {
 	//  testing threads
 	// ------------------------------------------------------------------------
 
-	private static abstract class Source {
-	
+	private abstract static class Source {
+
 		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 82a27cc..6691875 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -58,6 +58,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests verifying fairness in input gates.
+ */
 public class InputGateFairnessTest {
 
 	@Test
@@ -115,7 +118,7 @@ public class InputGateFairnessTest {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 		}
 
 		assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -207,11 +210,11 @@ public class InputGateFairnessTest {
 
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
-					gate, i, new ResultPartitionID(), mock(ConnectionID.class), 
+					gate, i, new ResultPartitionID(), mock(ConnectionID.class),
 					connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 			channels[i] = channel;
-			
+
 			for (int p = 0; p < buffersPerChannel; p++) {
 				channel.onBuffer(mockBuffer, p, -1);
 			}
@@ -233,7 +236,7 @@ public class InputGateFairnessTest {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 		}
 
 		assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -287,7 +290,7 @@ public class InputGateFairnessTest {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 
 			if (i % (2 * numChannels) == 0) {
 				// add three buffers to each channel, in random order
@@ -336,7 +339,7 @@ public class InputGateFairnessTest {
 			partitions[i].onBuffer(buffer, sequenceNumbers[i]++, -1);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
@@ -372,7 +375,6 @@ public class InputGateFairnessTest {
 			this.uniquenessChecker = new HashSet<>();
 		}
 
-
 		@Override
 		public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
 			synchronized (channelsWithData) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
index aecab75..b83067c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -40,14 +40,17 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
-	private final static int NUMBER_OF_TMS = 1;
-	private final static int NUMBER_OF_SLOTS_PER_TM = 1;
-	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+	private static final int NUMBER_OF_TMS = 1;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
-	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+	private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
 
 	private static TestingCluster flink;
 
@@ -72,7 +75,7 @@ public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
 	/**
 	 * Tests a fix for FLINK-1930.
 	 *
-	 * <p> When consuming a pipelined result only partially, is is possible that local channels
+	 * <p>When consuming a pipelined result only partially, is is possible that local channels
 	 * release the buffer pool, which is associated with the result partition, too early.  If the
 	 * producer is still producing data when this happens, it runs into an IllegalStateException,
 	 * because of the destroyed buffer pool.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 2eec34c..f6689fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -40,6 +40,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class PartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
@@ -78,8 +81,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	/**
 	 * Tests a fix for FLINK-1930.
 	 *
-	 * <p> When consuming a pipelined result only partially, is is possible that local channels
-	 * release the buffer pool, which is associated with the result partition, too early.  If the
+	 * <p>When consuming a pipelined result only partially, is is possible that local channels
+	 * release the buffer pool, which is associated with the result partition, too early. If the
 	 * producer is still producing data when this happens, it runs into an IllegalStateException,
 	 * because of the destroyed buffer pool.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index bc66c9d..fc9a643 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -59,10 +59,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link PipelinedSubpartition}.
+ */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
-	/** Executor service for concurrent produce/consume tests */
-	private final static ExecutorService executorService = Executors.newCachedThreadPool();
+	/** Executor service for concurrent produce/consume tests. */
+	private static final ExecutorService executorService = Executors.newCachedThreadPool();
 
 	@AfterClass
 	public static void shutdownExecutorService() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 6bff0f6..d182f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -26,6 +26,9 @@ import org.junit.Test;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link ProducerFailedException}.
+ */
 public class ProducerFailedExceptionTest {
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index d757aa9..2f5a013 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -31,6 +32,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link InputChannel}.
+ */
 public class InputChannelTest {
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index a914733..a67df0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -36,6 +36,11 @@ import java.util.Optional;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
+/**
+ * Input gate helper for unit tests.
+ *
+ * @param <T> type of the value to handle
+ */
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
 
 	private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1ecb67f..2afd6d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -75,12 +75,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link LocalInputChannel}.
+ */
 public class LocalInputChannelTest {
 
 	/**
 	 * Tests the consumption of multiple subpartitions via local input channels.
 	 *
-	 * <p> Multiple producer tasks produce pipelined partitions, which are consumed by multiple
+	 * <p>Multiple producer tasks produce pipelined partitions, which are consumed by multiple
 	 * tasks via local input channels.
 	 */
 	@Test
@@ -266,20 +269,22 @@ public class LocalInputChannelTest {
 	 * Verifies that concurrent release via the SingleInputGate and re-triggering
 	 * of a partition request works smoothly.
 	 *
-	 * - SingleInputGate acquires its request lock and tries to release all
+	 * <ul>
+	 * <li>SingleInputGate acquires its request lock and tries to release all
 	 * registered channels. When releasing a channel, it needs to acquire
-	 * the channel's shared request-release lock.
-	 * - If a LocalInputChannel concurrently retriggers a partition request via
+	 * the channel's shared request-release lock.</li>
+	 * <li>If a LocalInputChannel concurrently retriggers a partition request via
 	 * a Timer Thread it acquires the channel's request-release lock and calls
 	 * the retrigger callback on the SingleInputGate, which again tries to
-	 * acquire the gate's request lock.
+	 * acquire the gate's request lock.</li>
+	 * </ul>
 	 *
-	 * For certain timings this obviously leads to a deadlock. This test reliably
+	 * <p>For certain timings this obviously leads to a deadlock. This test reliably
 	 * reproduced such a timing (reported in FLINK-5228). This test is pretty much
 	 * testing the buggy implementation and has not much more general value. If it
 	 * becomes obsolete at some point (future greatness ;)), feel free to remove it.
 	 *
-	 * The fix in the end was to to not acquire the channels lock when releasing it
+	 * <p>The fix in the end was to to not acquire the channels lock when releasing it
 	 * and/or not doing any input gate callbacks while holding the channel's lock.
 	 * I decided to do both.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9141b36..ec80459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -336,9 +336,11 @@ public class RemoteInputChannelTest {
 	 * Tests to verify the behaviours of three different processes if the number of available
 	 * buffers is less than required buffers.
 	 *
-	 * 1. Recycle the floating buffer
-	 * 2. Recycle the exclusive buffer
-	 * 3. Decrease the sender's backlog
+	 * <ol>
+	 * <li>Recycle the floating buffer</li>
+	 * <li>Recycle the exclusive buffer</li>
+	 * <li>Decrease the sender's backlog</li>
+	 * </ol>
 	 */
 	@Test
 	public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 7120327..4bf5b22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -164,7 +164,7 @@ public class SingleInputGateTest {
 
 		final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
 		when(iterator.getNextBuffer()).thenReturn(
-			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false,0, false));
+			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false, 0, false));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager.createSubpartitionView(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index b0bafd5..33f5709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 2e01225..96f01fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -32,13 +32,16 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link UnionInputGate}.
+ */
 public class UnionInputGateTest {
 
 	/**
 	 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
 	 * value after receiving all end-of-partition events.
 	 *
-	 * <p> For buffer-or-event instances, it is important to verify that they have been set off to
+	 * <p>For buffer-or-event instances, it is important to verify that they have been set off to
 	 * the correct logical index.
 	 */
 	@Test(timeout = 120 * 1000)
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 5efc974..cf985a9 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -80,11 +80,11 @@ under the License.
 		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]disk[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<suppress
-		files="(.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)"
+		files="(.*)runtime[/\\]io[/\\]network[/\\](netty|util)[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...]
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
 	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)"
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](netty|util)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<!--Test class copied from the netty project-->
 	<suppress