You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:18 UTC

[flink] 01/09: [hotfix][checkstyle] Remove suppression for runtime/network.partition

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 542a35a1f57e8cc01b8ebe2b7f6b6f88b59ea875
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 11:29:15 2018 +0200

    [hotfix][checkstyle] Remove suppression for runtime/network.partition
---
 .../partition/PartitionNotFoundException.java      |  3 +++
 .../network/partition/PipelinedSubpartition.java   |  6 ++---
 .../io/network/partition/ResultPartition.java      | 26 +++++++++++-----------
 .../ResultPartitionConsumableNotifier.java         |  4 +++-
 .../io/network/partition/ResultPartitionID.java    |  2 +-
 .../network/partition/ResultPartitionProvider.java |  3 +++
 .../io/network/partition/ResultPartitionType.java  |  7 ++++--
 .../io/network/partition/ResultSubpartition.java   | 21 +++++++++--------
 .../network/partition/ResultSubpartitionView.java  |  1 +
 .../network/partition/consumer/InputChannel.java   | 18 +++++++--------
 .../network/partition/consumer/InputChannelID.java |  3 +++
 .../io/network/partition/consumer/InputGate.java   |  8 +++----
 .../partition/consumer/InputGateMetrics.java       |  2 +-
 .../partition/consumer/LocalInputChannel.java      |  4 ++--
 .../partition/consumer/UnknownInputChannel.java    |  6 ++---
 .../network/partition/InputChannelTestUtils.java   |  4 ++--
 .../network/partition/InputGateConcurrentTest.java |  7 ++++--
 .../network/partition/InputGateFairnessTest.java   | 16 +++++++------
 .../LegacyPartialConsumePipelinedResultTest.java   | 13 ++++++-----
 .../PartialConsumePipelinedResultTest.java         |  7 ++++--
 .../partition/PipelinedSubpartitionTest.java       |  7 ++++--
 .../partition/ProducerFailedExceptionTest.java     |  3 +++
 .../partition/consumer/InputChannelTest.java       |  4 ++++
 .../IteratorWrappingTestSingleInputGate.java       |  5 +++++
 .../partition/consumer/LocalInputChannelTest.java  | 19 ++++++++++------
 .../partition/consumer/RemoteInputChannelTest.java |  8 ++++---
 .../partition/consumer/SingleInputGateTest.java    |  2 +-
 .../partition/consumer/TestSingleInputGate.java    |  1 +
 .../partition/consumer/UnionInputGateTest.java     |  5 ++++-
 tools/maven/suppressions-runtime.xml               |  4 ++--
 30 files changed, 135 insertions(+), 84 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 7479862..2f78816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
 
 import java.io.IOException;
 
+/**
+ * Exception for failed partition requests due to non-existing partitions.
+ */
 public class PartitionNotFoundException extends IOException {
 
 	private static final long serialVersionUID = 0L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c6f3e15..91e0d4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -188,17 +188,17 @@ class PipelinedSubpartition extends ResultSubpartition {
 				buffer,
 				isAvailableUnsafe(),
 				getBuffersInBacklog(),
-				_nextBufferIsEvent());
+				nextBufferIsEventUnsafe());
 		}
 	}
 
 	boolean nextBufferIsEvent() {
 		synchronized (buffers) {
-			return _nextBufferIsEvent();
+			return nextBufferIsEventUnsafe();
 		}
 	}
 
-	private boolean _nextBufferIsEvent() {
+	private boolean nextBufferIsEventUnsafe() {
 		assert Thread.holdsLock(buffers);
 
 		return !buffers.isEmpty() && !buffers.peekFirst().isBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 93e5ba1..b32f73f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -48,17 +48,17 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A result partition for data produced by a single task.
  *
- * <p> This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
+ * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
  * or more {@link ResultSubpartition} instances, which further partition the data depending on the
  * number of consuming tasks and the data {@link DistributionPattern}.
  *
- * <p> Tasks, which consume a result partition have to request one of its subpartitions. The request
+ * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
  *
  * <h2>Life-cycle</h2>
  *
- * The life-cycle of each result partition has three (possibly overlapping) phases:
+ * <p>The life-cycle of each result partition has three (possibly overlapping) phases:
  * <ol>
  * <li><strong>Produce</strong>: </li>
  * <li><strong>Consume</strong>: </li>
@@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <h2>Lazy deployment and updates of consuming tasks</h2>
  *
- * Before a consuming task can request the result, it has to be deployed. The time of deployment
+ * <p>Before a consuming task can request the result, it has to be deployed. The time of deployment
  * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined
  * results, receivers are deployed as soon as the first buffer is added to the result partition.
  * With blocking results on the other hand, receivers are deployed after the partition is finished.
@@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
-	
+
 	private final String owningTaskName;
 
 	private final TaskActions taskActions;
@@ -174,10 +174,10 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	/**
 	 * Registers a buffer pool with this result partition.
-	 * <p>
-	 * There is one pool for each result partition, which is shared by all its sub partitions.
-	 * <p>
-	 * The pool is registered with the partition *after* it as been constructed in order to conform
+	 *
+	 * <p>There is one pool for each result partition, which is shared by all its sub partitions.
+	 *
+	 * <p>The pool is registered with the partition *after* it as been constructed in order to conform
 	 * to the life-cycle of task registrations in the {@link TaskManager}.
 	 */
 	public void registerBufferPool(BufferPool bufferPool) {
@@ -276,9 +276,9 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	/**
 	 * Finishes the result partition.
 	 *
-	 * <p> After this operation, it is not possible to add further data to the result partition.
+	 * <p>After this operation, it is not possible to add further data to the result partition.
 	 *
-	 * <p> For BLOCKING results, this will trigger the deployment of consuming tasks.
+	 * <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
 	 */
 	public void finish() throws IOException {
 		boolean success = false;
@@ -366,7 +366,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	/**
 	 * Releases buffers held by this result partition.
 	 *
-	 * <p> This is a callback from the buffer pool, which is registered for result partitions, which
+	 * <p>This is a callback from the buffer pool, which is registered for result partitions, which
 	 * are back pressure-free.
 	 */
 	@Override
@@ -395,7 +395,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	/**
 	 * Pins the result partition.
 	 *
-	 * <p> The partition can only be released after each subpartition has been consumed once per pin
+	 * <p>The partition can only be released after each subpartition has been consumed once per pin
 	 * operation.
 	 */
 	void pin() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 02212ce..10eb086 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
+/**
+ * Interface for notifications about consumable partitions.
+ */
 public interface ResultPartitionConsumableNotifier {
 	void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index b84c33b..cee79a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 /**
  * Runtime identifier of a produced {@link IntermediateResultPartition}.
  *
- * <p> In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
+ * <p>In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
  * identify a result partition. It needs to be associated with the producing task as well to ensure
  * correct tracking of failed/restarted tasks.
  */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index db72d63..faeaaf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
 
 import java.io.IOException;
 
+/**
+ * Interface for creating result partitions.
+ */
 public interface ResultPartitionProvider {
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 256387c..f62dbeeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+/**
+ * Type of a result partition.
+ */
 public enum ResultPartitionType {
 
 	BLOCKING(false, false, false),
@@ -27,12 +30,12 @@ public enum ResultPartitionType {
 	/**
 	 * Pipelined partitions with a bounded (local) buffer pool.
 	 *
-	 * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+	 * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
 	 * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
 	 * overall network buffer pool size, this, however, still allows to be flexible with regards
 	 * to the total number of partitions by selecting an appropriately big network buffer pool size.
 	 *
-	 * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+	 * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
 	 * no checkpoint barriers.
 	 */
 	PIPELINED_BOUNDED(true, true, true);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index adc0ed3..58a1402 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -43,16 +43,16 @@ public abstract class ResultSubpartition {
 	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
 	protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
 
-	/** The number of non-event buffers currently in this subpartition */
+	/** The number of non-event buffers currently in this subpartition. */
 	@GuardedBy("buffers")
 	private int buffersInBacklog;
 
 	// - Statistics ----------------------------------------------------------
 
-	/** The total number of buffers (both data and event buffers) */
+	/** The total number of buffers (both data and event buffers). */
 	private long totalNumberOfBuffers;
 
-	/** The total number of bytes (both data and event buffers) */
+	/** The total number of bytes (both data and event buffers). */
 	private long totalNumberOfBytes;
 
 	public ResultSubpartition(int index, ResultPartition parent) {
@@ -102,19 +102,19 @@ public abstract class ResultSubpartition {
 	 * @throws IOException
 	 * 		thrown in case of errors while adding the buffer
 	 */
-	abstract public boolean add(BufferConsumer bufferConsumer) throws IOException;
+	public abstract boolean add(BufferConsumer bufferConsumer) throws IOException;
 
-	abstract public void flush();
+	public abstract void flush();
 
-	abstract public void finish() throws IOException;
+	public abstract void finish() throws IOException;
 
-	abstract public void release() throws IOException;
+	public abstract void release() throws IOException;
 
-	abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
+	public abstract ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
 
 	abstract int releaseMemory() throws IOException;
 
-	abstract public boolean isReleased();
+	public abstract boolean isReleased();
 
 	/**
 	 * Gets the number of non-event buffers in this subpartition.
@@ -132,7 +132,7 @@ public abstract class ResultSubpartition {
 	 * This method must not acquire locks or interfere with the task and network threads in
 	 * any way.
 	 */
-	abstract public int unsynchronizedGetNumberOfQueuedBuffers();
+	public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
 	/**
 	 * Decreases the number of non-event buffers by one after fetching a non-event
@@ -198,7 +198,6 @@ public abstract class ResultSubpartition {
 			return buffersInBacklog;
 		}
 
-
 		public boolean nextBufferIsEvent() {
 			return nextBufferIsEvent;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index b1ccd63..a755955 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 2a7cedf..a08ecc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -34,8 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An input channel consumes a single {@link ResultSubpartitionView}.
- * <p>
- * For each channel, the consumption life cycle is as follows:
+ *
+ * <p>For each channel, the consumption life cycle is as follows:
  * <ol>
  * <li>{@link #requestSubpartition(int)}</li>
  * <li>{@link #getNextBuffer()}</li>
@@ -66,7 +66,7 @@ public abstract class InputChannel {
 
 	protected final Counter numBuffersIn;
 
-	/** The current backoff (in ms) */
+	/** The current backoff (in ms). */
 	private int currentBackoff;
 
 	protected InputChannel(
@@ -111,12 +111,12 @@ public abstract class InputChannel {
 
 	/**
 	 * Notifies the owning {@link SingleInputGate} that this channel became non-empty.
-	 * 
+	 *
 	 * <p>This is guaranteed to be called only when a Buffer was added to a previously
 	 * empty input channel. The notion of empty is atomically consistent with the flag
 	 * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer
 	 * from this channel.
-	 * 
+	 *
 	 * <p><b>Note:</b> When the input channel observes an exception, this
 	 * method is called regardless of whether the channel was empty before. That ensures
 	 * that the parent InputGate will always be notified about the exception.
@@ -132,8 +132,8 @@ public abstract class InputChannel {
 	/**
 	 * Requests the queue with the specified index of the source intermediate
 	 * result partition.
-	 * <p>
-	 * The queue index to request depends on which sub task the channel belongs
+	 *
+	 * <p>The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
 	abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
@@ -149,8 +149,8 @@ public abstract class InputChannel {
 
 	/**
 	 * Sends a {@link TaskEvent} back to the task producing the consumed result partition.
-	 * <p>
-	 * <strong>Important</strong>: The producing task has to be running to receive backwards events.
+	 *
+	 * <p><strong>Important</strong>: The producing task has to be running to receive backwards events.
 	 * This means that the result type needs to be pipelined and the task logic has to ensure that
 	 * the producer will wait for all backwards events. Otherwise, this will lead to an Exception
 	 * at runtime.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
index ceeb83d..c1886de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
@@ -22,6 +22,9 @@ import org.apache.flink.util.AbstractID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+/**
+ * Identifier for input channels.
+ */
 public class InputChannelID extends AbstractID {
 
 	private static final long serialVersionUID = 1L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c78abb5..6e59f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -26,10 +26,10 @@ import java.util.Optional;
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator produces data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -38,7 +38,7 @@ import java.util.Optional;
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
  * subpartitions.
  *
@@ -59,7 +59,7 @@ import java.util.Optional;
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
  * subpartitions -- one for each parallel reduce subtask. As shown in the Figure, each reduce task
  * will have an input gate attached to it. This will provide its input, which will consist of one
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 69af455..ebb8b9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -40,7 +40,7 @@ public class InputGateMetrics {
 
 	// ------------------------------------------------------------------------
 
-	// these methods are package private to make access from the nested classes faster 
+	// these methods are package private to make access from the nested classes faster
 
 	/**
 	 * Iterates over all input channels and collects the total number of queued buffers in a
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4b3a8ff..e7986bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -57,7 +57,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	/** Task event dispatcher for backwards events. */
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	/** The consumed subpartition */
+	/** The consumed subpartition. */
 	private volatile ResultSubpartitionView subpartitionView;
 
 	private volatile boolean isReleased;
@@ -245,7 +245,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	}
 
 	/**
-	 * Releases the partition reader
+	 * Releases the partition reader.
 	 */
 	@Override
 	void releaseAllResources() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 20a7aed..c0e9177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -89,8 +89,8 @@ class UnknownInputChannel extends InputChannel {
 
 	/**
 	 * Returns <code>false</code>.
-	 * <p>
-	 * <strong>Important</strong>: It is important that the method correctly
+	 *
+	 * <p><strong>Important</strong>: It is important that the method correctly
 	 * always <code>false</code> for unknown input channels in order to not
 	 * finish the consumption of an intermediate result partition early.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index f73ede7..f7db40b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -59,7 +59,7 @@ public class InputChannelTestUtils {
 
 		return manager;
 	}
-	
+
 	public static ConnectionManager createDummyConnectionManager() throws Exception {
 		final PartitionRequestClient mockClient = mock(PartitionRequestClient.class);
 
@@ -71,6 +71,6 @@ public class InputChannelTestUtils {
 
 	// ------------------------------------------------------------------------
 
-	/** This class is not meant to be instantiated */
+	/** This class is not meant to be instantiated. */
 	private InputChannelTestUtils() {}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 73f3cfb..5f5728d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -46,6 +46,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Concurrency tests for input gates.
+ */
 public class InputGateConcurrentTest {
 
 	@Test
@@ -192,8 +195,8 @@ public class InputGateConcurrentTest {
 	//  testing threads
 	// ------------------------------------------------------------------------
 
-	private static abstract class Source {
-	
+	private abstract static class Source {
+
 		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 82a27cc..6691875 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -58,6 +58,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests verifying fairness in input gates.
+ */
 public class InputGateFairnessTest {
 
 	@Test
@@ -115,7 +118,7 @@ public class InputGateFairnessTest {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 		}
 
 		assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -207,11 +210,11 @@ public class InputGateFairnessTest {
 
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
-					gate, i, new ResultPartitionID(), mock(ConnectionID.class), 
+					gate, i, new ResultPartitionID(), mock(ConnectionID.class),
 					connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 			channels[i] = channel;
-			
+
 			for (int p = 0; p < buffersPerChannel; p++) {
 				channel.onBuffer(mockBuffer, p, -1);
 			}
@@ -233,7 +236,7 @@ public class InputGateFairnessTest {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 		}
 
 		assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -287,7 +290,7 @@ public class InputGateFairnessTest {
 				max = Math.max(max, size);
 			}
 
-			assertTrue(max == min || max == min+1);
+			assertTrue(max == min || max == (min + 1));
 
 			if (i % (2 * numChannels) == 0) {
 				// add three buffers to each channel, in random order
@@ -336,7 +339,7 @@ public class InputGateFairnessTest {
 			partitions[i].onBuffer(buffer, sequenceNumbers[i]++, -1);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
@@ -372,7 +375,6 @@ public class InputGateFairnessTest {
 			this.uniquenessChecker = new HashSet<>();
 		}
 
-
 		@Override
 		public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
 			synchronized (channelsWithData) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
index aecab75..b83067c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -40,14 +40,17 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
-	private final static int NUMBER_OF_TMS = 1;
-	private final static int NUMBER_OF_SLOTS_PER_TM = 1;
-	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+	private static final int NUMBER_OF_TMS = 1;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
-	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+	private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
 
 	private static TestingCluster flink;
 
@@ -72,7 +75,7 @@ public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
 	/**
 	 * Tests a fix for FLINK-1930.
 	 *
-	 * <p> When consuming a pipelined result only partially, is is possible that local channels
+	 * <p>When consuming a pipelined result only partially, is is possible that local channels
 	 * release the buffer pool, which is associated with the result partition, too early.  If the
 	 * producer is still producing data when this happens, it runs into an IllegalStateException,
 	 * because of the destroyed buffer pool.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 2eec34c..f6689fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -40,6 +40,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class PartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
@@ -78,8 +81,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	/**
 	 * Tests a fix for FLINK-1930.
 	 *
-	 * <p> When consuming a pipelined result only partially, is is possible that local channels
-	 * release the buffer pool, which is associated with the result partition, too early.  If the
+	 * <p>When consuming a pipelined result only partially, is is possible that local channels
+	 * release the buffer pool, which is associated with the result partition, too early. If the
 	 * producer is still producing data when this happens, it runs into an IllegalStateException,
 	 * because of the destroyed buffer pool.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index bc66c9d..fc9a643 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -59,10 +59,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link PipelinedSubpartition}.
+ */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
-	/** Executor service for concurrent produce/consume tests */
-	private final static ExecutorService executorService = Executors.newCachedThreadPool();
+	/** Executor service for concurrent produce/consume tests. */
+	private static final ExecutorService executorService = Executors.newCachedThreadPool();
 
 	@AfterClass
 	public static void shutdownExecutorService() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 6bff0f6..d182f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -26,6 +26,9 @@ import org.junit.Test;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link ProducerFailedException}.
+ */
 public class ProducerFailedExceptionTest {
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index d757aa9..2f5a013 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -31,6 +32,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link InputChannel}.
+ */
 public class InputChannelTest {
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index a914733..a67df0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -36,6 +36,11 @@ import java.util.Optional;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
+/**
+ * Input gate helper for unit tests.
+ *
+ * @param <T> type of the value to handle
+ */
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
 
 	private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1ecb67f..2afd6d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -75,12 +75,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link LocalInputChannel}.
+ */
 public class LocalInputChannelTest {
 
 	/**
 	 * Tests the consumption of multiple subpartitions via local input channels.
 	 *
-	 * <p> Multiple producer tasks produce pipelined partitions, which are consumed by multiple
+	 * <p>Multiple producer tasks produce pipelined partitions, which are consumed by multiple
 	 * tasks via local input channels.
 	 */
 	@Test
@@ -266,20 +269,22 @@ public class LocalInputChannelTest {
 	 * Verifies that concurrent release via the SingleInputGate and re-triggering
 	 * of a partition request works smoothly.
 	 *
-	 * - SingleInputGate acquires its request lock and tries to release all
+	 * <ul>
+	 * <li>SingleInputGate acquires its request lock and tries to release all
 	 * registered channels. When releasing a channel, it needs to acquire
-	 * the channel's shared request-release lock.
-	 * - If a LocalInputChannel concurrently retriggers a partition request via
+	 * the channel's shared request-release lock.</li>
+	 * <li>If a LocalInputChannel concurrently retriggers a partition request via
 	 * a Timer Thread it acquires the channel's request-release lock and calls
 	 * the retrigger callback on the SingleInputGate, which again tries to
-	 * acquire the gate's request lock.
+	 * acquire the gate's request lock.</li>
+	 * </ul>
 	 *
-	 * For certain timings this obviously leads to a deadlock. This test reliably
+	 * <p>For certain timings this obviously leads to a deadlock. This test reliably
 	 * reproduced such a timing (reported in FLINK-5228). This test is pretty much
 	 * testing the buggy implementation and has not much more general value. If it
 	 * becomes obsolete at some point (future greatness ;)), feel free to remove it.
 	 *
-	 * The fix in the end was to to not acquire the channels lock when releasing it
+	 * <p>The fix in the end was to to not acquire the channels lock when releasing it
 	 * and/or not doing any input gate callbacks while holding the channel's lock.
 	 * I decided to do both.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9141b36..ec80459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -336,9 +336,11 @@ public class RemoteInputChannelTest {
 	 * Tests to verify the behaviours of three different processes if the number of available
 	 * buffers is less than required buffers.
 	 *
-	 * 1. Recycle the floating buffer
-	 * 2. Recycle the exclusive buffer
-	 * 3. Decrease the sender's backlog
+	 * <ol>
+	 * <li>Recycle the floating buffer</li>
+	 * <li>Recycle the exclusive buffer</li>
+	 * <li>Decrease the sender's backlog</li>
+	 * </ol>
 	 */
 	@Test
 	public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 7120327..4bf5b22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -164,7 +164,7 @@ public class SingleInputGateTest {
 
 		final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
 		when(iterator.getNextBuffer()).thenReturn(
-			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false,0, false));
+			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false, 0, false));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager.createSubpartitionView(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index b0bafd5..33f5709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 2e01225..96f01fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -32,13 +32,16 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link UnionInputGate}.
+ */
 public class UnionInputGateTest {
 
 	/**
 	 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
 	 * value after receiving all end-of-partition events.
 	 *
-	 * <p> For buffer-or-event instances, it is important to verify that they have been set off to
+	 * <p>For buffer-or-event instances, it is important to verify that they have been set off to
 	 * the correct logical index.
 	 */
 	@Test(timeout = 120 * 1000)
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 33a92e3..51793e5 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -87,11 +87,11 @@ under the License.
 		files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<suppress
-		files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...]
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
 	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<!--Test class copied from the netty project-->
 	<suppress