You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:18 UTC
[flink] 01/09: [hotfix][checkstyle] Remove suppression for
runtime/network.partition
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 542a35a1f57e8cc01b8ebe2b7f6b6f88b59ea875
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 11:29:15 2018 +0200
[hotfix][checkstyle] Remove suppression for runtime/network.partition
---
.../partition/PartitionNotFoundException.java | 3 +++
.../network/partition/PipelinedSubpartition.java | 6 ++---
.../io/network/partition/ResultPartition.java | 26 +++++++++++-----------
.../ResultPartitionConsumableNotifier.java | 4 +++-
.../io/network/partition/ResultPartitionID.java | 2 +-
.../network/partition/ResultPartitionProvider.java | 3 +++
.../io/network/partition/ResultPartitionType.java | 7 ++++--
.../io/network/partition/ResultSubpartition.java | 21 +++++++++--------
.../network/partition/ResultSubpartitionView.java | 1 +
.../network/partition/consumer/InputChannel.java | 18 +++++++--------
.../network/partition/consumer/InputChannelID.java | 3 +++
.../io/network/partition/consumer/InputGate.java | 8 +++----
.../partition/consumer/InputGateMetrics.java | 2 +-
.../partition/consumer/LocalInputChannel.java | 4 ++--
.../partition/consumer/UnknownInputChannel.java | 6 ++---
.../network/partition/InputChannelTestUtils.java | 4 ++--
.../network/partition/InputGateConcurrentTest.java | 7 ++++--
.../network/partition/InputGateFairnessTest.java | 16 +++++++------
.../LegacyPartialConsumePipelinedResultTest.java | 13 ++++++-----
.../PartialConsumePipelinedResultTest.java | 7 ++++--
.../partition/PipelinedSubpartitionTest.java | 7 ++++--
.../partition/ProducerFailedExceptionTest.java | 3 +++
.../partition/consumer/InputChannelTest.java | 4 ++++
.../IteratorWrappingTestSingleInputGate.java | 5 +++++
.../partition/consumer/LocalInputChannelTest.java | 19 ++++++++++------
.../partition/consumer/RemoteInputChannelTest.java | 8 ++++---
.../partition/consumer/SingleInputGateTest.java | 2 +-
.../partition/consumer/TestSingleInputGate.java | 1 +
.../partition/consumer/UnionInputGateTest.java | 5 ++++-
tools/maven/suppressions-runtime.xml | 4 ++--
30 files changed, 135 insertions(+), 84 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 7479862..2f78816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
import java.io.IOException;
+/**
+ * Exception for failed partition requests due to non-existing partitions.
+ */
public class PartitionNotFoundException extends IOException {
private static final long serialVersionUID = 0L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c6f3e15..91e0d4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -188,17 +188,17 @@ class PipelinedSubpartition extends ResultSubpartition {
buffer,
isAvailableUnsafe(),
getBuffersInBacklog(),
- _nextBufferIsEvent());
+ nextBufferIsEventUnsafe());
}
}
boolean nextBufferIsEvent() {
synchronized (buffers) {
- return _nextBufferIsEvent();
+ return nextBufferIsEventUnsafe();
}
}
- private boolean _nextBufferIsEvent() {
+ private boolean nextBufferIsEventUnsafe() {
assert Thread.holdsLock(buffers);
return !buffers.isEmpty() && !buffers.peekFirst().isBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 93e5ba1..b32f73f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -48,17 +48,17 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* A result partition for data produced by a single task.
*
- * <p> This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
+ * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
* a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
* or more {@link ResultSubpartition} instances, which further partition the data depending on the
* number of consuming tasks and the data {@link DistributionPattern}.
*
- * <p> Tasks, which consume a result partition have to request one of its subpartitions. The request
+ * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
* happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
*
* <h2>Life-cycle</h2>
*
- * The life-cycle of each result partition has three (possibly overlapping) phases:
+ * <p>The life-cycle of each result partition has three (possibly overlapping) phases:
* <ol>
* <li><strong>Produce</strong>: </li>
* <li><strong>Consume</strong>: </li>
@@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkState;
*
* <h2>Lazy deployment and updates of consuming tasks</h2>
*
- * Before a consuming task can request the result, it has to be deployed. The time of deployment
+ * <p>Before a consuming task can request the result, it has to be deployed. The time of deployment
* depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined
* results, receivers are deployed as soon as the first buffer is added to the result partition.
* With blocking results on the other hand, receivers are deployed after the partition is finished.
@@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState;
public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
-
+
private final String owningTaskName;
private final TaskActions taskActions;
@@ -174,10 +174,10 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
/**
* Registers a buffer pool with this result partition.
- * <p>
- * There is one pool for each result partition, which is shared by all its sub partitions.
- * <p>
- * The pool is registered with the partition *after* it as been constructed in order to conform
+ *
+ * <p>There is one pool for each result partition, which is shared by all its sub partitions.
+ *
+ * <p>The pool is registered with the partition *after* it as been constructed in order to conform
* to the life-cycle of task registrations in the {@link TaskManager}.
*/
public void registerBufferPool(BufferPool bufferPool) {
@@ -276,9 +276,9 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
/**
* Finishes the result partition.
*
- * <p> After this operation, it is not possible to add further data to the result partition.
+ * <p>After this operation, it is not possible to add further data to the result partition.
*
- * <p> For BLOCKING results, this will trigger the deployment of consuming tasks.
+ * <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
*/
public void finish() throws IOException {
boolean success = false;
@@ -366,7 +366,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
/**
* Releases buffers held by this result partition.
*
- * <p> This is a callback from the buffer pool, which is registered for result partitions, which
+ * <p>This is a callback from the buffer pool, which is registered for result partitions, which
* are back pressure-free.
*/
@Override
@@ -395,7 +395,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
/**
* Pins the result partition.
*
- * <p> The partition can only be released after each subpartition has been consumed once per pin
+ * <p>The partition can only be released after each subpartition has been consumed once per pin
* operation.
*/
void pin() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 02212ce..10eb086 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,10 +18,12 @@
package org.apache.flink.runtime.io.network.partition;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.taskmanager.TaskActions;
+/**
+ * Interface for notifications about consumable partitions.
+ */
public interface ResultPartitionConsumableNotifier {
void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index b84c33b..cee79a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
/**
* Runtime identifier of a produced {@link IntermediateResultPartition}.
*
- * <p> In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
+ * <p>In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
* identify a result partition. It needs to be associated with the producing task as well to ensure
* correct tracking of failed/restarted tasks.
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index db72d63..faeaaf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
import java.io.IOException;
+/**
+ * Interface for creating result partitions.
+ */
public interface ResultPartitionProvider {
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 256387c..f62dbeeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.io.network.partition;
+/**
+ * Type of a result partition.
+ */
public enum ResultPartitionType {
BLOCKING(false, false, false),
@@ -27,12 +30,12 @@ public enum ResultPartitionType {
/**
* Pipelined partitions with a bounded (local) buffer pool.
*
- * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+ * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
* data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
* overall network buffer pool size, this, however, still allows to be flexible with regards
* to the total number of partitions by selecting an appropriately big network buffer pool size.
*
- * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+ * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
* no checkpoint barriers.
*/
PIPELINED_BOUNDED(true, true, true);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index adc0ed3..58a1402 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -43,16 +43,16 @@ public abstract class ResultSubpartition {
/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
- /** The number of non-event buffers currently in this subpartition */
+ /** The number of non-event buffers currently in this subpartition. */
@GuardedBy("buffers")
private int buffersInBacklog;
// - Statistics ----------------------------------------------------------
- /** The total number of buffers (both data and event buffers) */
+ /** The total number of buffers (both data and event buffers). */
private long totalNumberOfBuffers;
- /** The total number of bytes (both data and event buffers) */
+ /** The total number of bytes (both data and event buffers). */
private long totalNumberOfBytes;
public ResultSubpartition(int index, ResultPartition parent) {
@@ -102,19 +102,19 @@ public abstract class ResultSubpartition {
* @throws IOException
* thrown in case of errors while adding the buffer
*/
- abstract public boolean add(BufferConsumer bufferConsumer) throws IOException;
+ public abstract boolean add(BufferConsumer bufferConsumer) throws IOException;
- abstract public void flush();
+ public abstract void flush();
- abstract public void finish() throws IOException;
+ public abstract void finish() throws IOException;
- abstract public void release() throws IOException;
+ public abstract void release() throws IOException;
- abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
+ public abstract ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
abstract int releaseMemory() throws IOException;
- abstract public boolean isReleased();
+ public abstract boolean isReleased();
/**
* Gets the number of non-event buffers in this subpartition.
@@ -132,7 +132,7 @@ public abstract class ResultSubpartition {
* This method must not acquire locks or interfere with the task and network threads in
* any way.
*/
- abstract public int unsynchronizedGetNumberOfQueuedBuffers();
+ public abstract int unsynchronizedGetNumberOfQueuedBuffers();
/**
* Decreases the number of non-event buffers by one after fetching a non-event
@@ -198,7 +198,6 @@ public abstract class ResultSubpartition {
return buffersInBacklog;
}
-
public boolean nextBufferIsEvent() {
return nextBufferIsEvent;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index b1ccd63..a755955 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import javax.annotation.Nullable;
+
import java.io.IOException;
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 2a7cedf..a08ecc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -34,8 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An input channel consumes a single {@link ResultSubpartitionView}.
- * <p>
- * For each channel, the consumption life cycle is as follows:
+ *
+ * <p>For each channel, the consumption life cycle is as follows:
* <ol>
* <li>{@link #requestSubpartition(int)}</li>
* <li>{@link #getNextBuffer()}</li>
@@ -66,7 +66,7 @@ public abstract class InputChannel {
protected final Counter numBuffersIn;
- /** The current backoff (in ms) */
+ /** The current backoff (in ms). */
private int currentBackoff;
protected InputChannel(
@@ -111,12 +111,12 @@ public abstract class InputChannel {
/**
* Notifies the owning {@link SingleInputGate} that this channel became non-empty.
- *
+ *
* <p>This is guaranteed to be called only when a Buffer was added to a previously
* empty input channel. The notion of empty is atomically consistent with the flag
* {@link BufferAndAvailability#moreAvailable()} when polling the next buffer
* from this channel.
- *
+ *
* <p><b>Note:</b> When the input channel observes an exception, this
* method is called regardless of whether the channel was empty before. That ensures
* that the parent InputGate will always be notified about the exception.
@@ -132,8 +132,8 @@ public abstract class InputChannel {
/**
* Requests the queue with the specified index of the source intermediate
* result partition.
- * <p>
- * The queue index to request depends on which sub task the channel belongs
+ *
+ * <p>The queue index to request depends on which sub task the channel belongs
* to and is specified by the consumer of this channel.
*/
abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
@@ -149,8 +149,8 @@ public abstract class InputChannel {
/**
* Sends a {@link TaskEvent} back to the task producing the consumed result partition.
- * <p>
- * <strong>Important</strong>: The producing task has to be running to receive backwards events.
+ *
+ * <p><strong>Important</strong>: The producing task has to be running to receive backwards events.
* This means that the result type needs to be pipelined and the task logic has to ensure that
* the producer will wait for all backwards events. Otherwise, this will lead to an Exception
* at runtime.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
index ceeb83d..c1886de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
@@ -22,6 +22,9 @@ import org.apache.flink.util.AbstractID;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+/**
+ * Identifier for input channels.
+ */
public class InputChannelID extends AbstractID {
private static final long serialVersionUID = 1L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c78abb5..6e59f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -26,10 +26,10 @@ import java.util.Optional;
/**
* An input gate consumes one or more partitions of a single produced intermediate result.
*
- * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these
* partitions is furthermore partitioned into one or more subpartitions.
*
- * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator produces data and the
* reduce operator consumes the produced data.
*
* <pre>{@code
@@ -38,7 +38,7 @@ import java.util.Optional;
* +-----+ +---------------------+ +--------+
* }</pre>
*
- * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its
* producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
* subpartitions.
*
@@ -59,7 +59,7 @@ import java.util.Optional;
* +-----------------------------------------+
* }</pre>
*
- * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting
* in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
* subpartitions -- one for each parallel reduce subtask. As shown in the Figure, each reduce task
* will have an input gate attached to it. This will provide its input, which will consist of one
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 69af455..ebb8b9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -40,7 +40,7 @@ public class InputGateMetrics {
// ------------------------------------------------------------------------
- // these methods are package private to make access from the nested classes faster
+ // these methods are package private to make access from the nested classes faster
/**
* Iterates over all input channels and collects the total number of queued buffers in a
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4b3a8ff..e7986bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -57,7 +57,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
/** Task event dispatcher for backwards events. */
private final TaskEventDispatcher taskEventDispatcher;
- /** The consumed subpartition */
+ /** The consumed subpartition. */
private volatile ResultSubpartitionView subpartitionView;
private volatile boolean isReleased;
@@ -245,7 +245,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
/**
- * Releases the partition reader
+ * Releases the partition reader.
*/
@Override
void releaseAllResources() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 20a7aed..c0e9177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import java.io.IOException;
import java.util.Optional;
@@ -89,8 +89,8 @@ class UnknownInputChannel extends InputChannel {
/**
* Returns <code>false</code>.
- * <p>
- * <strong>Important</strong>: It is important that the method correctly
+ *
+ * <p><strong>Important</strong>: It is important that the method correctly
* always <code>false</code> for unknown input channels in order to not
* finish the consumption of an intermediate result partition early.
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index f73ede7..f7db40b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -59,7 +59,7 @@ public class InputChannelTestUtils {
return manager;
}
-
+
public static ConnectionManager createDummyConnectionManager() throws Exception {
final PartitionRequestClient mockClient = mock(PartitionRequestClient.class);
@@ -71,6 +71,6 @@ public class InputChannelTestUtils {
// ------------------------------------------------------------------------
- /** This class is not meant to be instantiated */
+ /** This class is not meant to be instantiated. */
private InputChannelTestUtils() {}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 73f3cfb..5f5728d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -46,6 +46,9 @@ import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
+/**
+ * Concurrency tests for input gates.
+ */
public class InputGateConcurrentTest {
@Test
@@ -192,8 +195,8 @@ public class InputGateConcurrentTest {
// testing threads
// ------------------------------------------------------------------------
- private static abstract class Source {
-
+ private abstract static class Source {
+
abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 82a27cc..6691875 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -58,6 +58,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+/**
+ * Tests verifying fairness in input gates.
+ */
public class InputGateFairnessTest {
@Test
@@ -115,7 +118,7 @@ public class InputGateFairnessTest {
max = Math.max(max, size);
}
- assertTrue(max == min || max == min+1);
+ assertTrue(max == min || max == (min + 1));
}
assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -207,11 +210,11 @@ public class InputGateFairnessTest {
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
- gate, i, new ResultPartitionID(), mock(ConnectionID.class),
+ gate, i, new ResultPartitionID(), mock(ConnectionID.class),
connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channels[i] = channel;
-
+
for (int p = 0; p < buffersPerChannel; p++) {
channel.onBuffer(mockBuffer, p, -1);
}
@@ -233,7 +236,7 @@ public class InputGateFairnessTest {
max = Math.max(max, size);
}
- assertTrue(max == min || max == min+1);
+ assertTrue(max == min || max == (min + 1));
}
assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -287,7 +290,7 @@ public class InputGateFairnessTest {
max = Math.max(max, size);
}
- assertTrue(max == min || max == min+1);
+ assertTrue(max == min || max == (min + 1));
if (i % (2 * numChannels) == 0) {
// add three buffers to each channel, in random order
@@ -336,7 +339,7 @@ public class InputGateFairnessTest {
partitions[i].onBuffer(buffer, sequenceNumbers[i]++, -1);
}
}
-
+
// ------------------------------------------------------------------------
private static class FairnessVerifyingInputGate extends SingleInputGate {
@@ -372,7 +375,6 @@ public class InputGateFairnessTest {
this.uniquenessChecker = new HashSet<>();
}
-
@Override
public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
synchronized (channelsWithData) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
index aecab75..b83067c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -40,14 +40,17 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+/**
+ * Test for consuming a pipelined result only partially.
+ */
public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
// Test configuration
- private final static int NUMBER_OF_TMS = 1;
- private final static int NUMBER_OF_SLOTS_PER_TM = 1;
- private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+ private static final int NUMBER_OF_TMS = 1;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
- private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+ private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
private static TestingCluster flink;
@@ -72,7 +75,7 @@ public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
/**
* Tests a fix for FLINK-1930.
*
- * <p> When consuming a pipelined result only partially, is is possible that local channels
+ * <p>When consuming a pipelined result only partially, is is possible that local channels
* release the buffer pool, which is associated with the result partition, too early. If the
* producer is still producing data when this happens, it runs into an IllegalStateException,
* because of the destroyed buffer pool.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 2eec34c..f6689fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -40,6 +40,9 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+/**
+ * Test for consuming a pipelined result only partially.
+ */
public class PartialConsumePipelinedResultTest extends TestLogger {
// Test configuration
@@ -78,8 +81,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
/**
* Tests a fix for FLINK-1930.
*
- * <p> When consuming a pipelined result only partially, is is possible that local channels
- * release the buffer pool, which is associated with the result partition, too early. If the
+ * <p>When consuming a pipelined result only partially, is is possible that local channels
+ * release the buffer pool, which is associated with the result partition, too early. If the
* producer is still producing data when this happens, it runs into an IllegalStateException,
* because of the destroyed buffer pool.
*
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index bc66c9d..fc9a643 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -59,10 +59,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for {@link PipelinedSubpartition}.
+ */
public class PipelinedSubpartitionTest extends SubpartitionTestBase {
- /** Executor service for concurrent produce/consume tests */
- private final static ExecutorService executorService = Executors.newCachedThreadPool();
+ /** Executor service for concurrent produce/consume tests. */
+ private static final ExecutorService executorService = Executors.newCachedThreadPool();
@AfterClass
public static void shutdownExecutorService() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 6bff0f6..d182f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -26,6 +26,9 @@ import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link ProducerFailedException}.
+ */
public class ProducerFailedExceptionTest {
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index d757aa9..2f5a013 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
import org.junit.Test;
import java.io.IOException;
@@ -31,6 +32,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+/**
+ * Tests for {@link InputChannel}.
+ */
public class InputChannelTest {
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index a914733..a67df0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -36,6 +36,11 @@ import java.util.Optional;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+/**
+ * Input gate helper for unit tests.
+ *
+ * @param <T> type of the value to handle
+ */
public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1ecb67f..2afd6d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -75,12 +75,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the {@link LocalInputChannel}.
+ */
public class LocalInputChannelTest {
/**
* Tests the consumption of multiple subpartitions via local input channels.
*
- * <p> Multiple producer tasks produce pipelined partitions, which are consumed by multiple
+ * <p>Multiple producer tasks produce pipelined partitions, which are consumed by multiple
* tasks via local input channels.
*/
@Test
@@ -266,20 +269,22 @@ public class LocalInputChannelTest {
* Verifies that concurrent release via the SingleInputGate and re-triggering
* of a partition request works smoothly.
*
- * - SingleInputGate acquires its request lock and tries to release all
+ * <ul>
+ * <li>SingleInputGate acquires its request lock and tries to release all
* registered channels. When releasing a channel, it needs to acquire
- * the channel's shared request-release lock.
- * - If a LocalInputChannel concurrently retriggers a partition request via
+ * the channel's shared request-release lock.</li>
+ * <li>If a LocalInputChannel concurrently retriggers a partition request via
* a Timer Thread it acquires the channel's request-release lock and calls
* the retrigger callback on the SingleInputGate, which again tries to
- * acquire the gate's request lock.
+ * acquire the gate's request lock.</li>
+ * </ul>
*
- * For certain timings this obviously leads to a deadlock. This test reliably
+ * <p>For certain timings this obviously leads to a deadlock. This test reliably
* reproduced such a timing (reported in FLINK-5228). This test is pretty much
* testing the buggy implementation and has not much more general value. If it
* becomes obsolete at some point (future greatness ;)), feel free to remove it.
*
- * The fix in the end was to to not acquire the channels lock when releasing it
+ * <p>The fix in the end was to to not acquire the channels lock when releasing it
* and/or not doing any input gate callbacks while holding the channel's lock.
* I decided to do both.
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9141b36..ec80459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -336,9 +336,11 @@ public class RemoteInputChannelTest {
* Tests to verify the behaviours of three different processes if the number of available
* buffers is less than required buffers.
*
- * 1. Recycle the floating buffer
- * 2. Recycle the exclusive buffer
- * 3. Decrease the sender's backlog
+ * <ol>
+ * <li>Recycle the floating buffer</li>
+ * <li>Recycle the exclusive buffer</li>
+ * <li>Decrease the sender's backlog</li>
+ * </ol>
*/
@Test
public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 7120327..4bf5b22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -164,7 +164,7 @@ public class SingleInputGateTest {
final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
when(iterator.getNextBuffer()).thenReturn(
- new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false,0, false));
+ new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false, 0, false));
final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager.createSubpartitionView(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index b0bafd5..33f5709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 2e01225..96f01fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -32,13 +32,16 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+/**
+ * Tests for {@link UnionInputGate}.
+ */
public class UnionInputGateTest {
/**
* Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
* value after receiving all end-of-partition events.
*
- * <p> For buffer-or-event instances, it is important to verify that they have been set off to
+ * <p>For buffer-or-event instances, it is important to verify that they have been set off to
* the correct logical index.
*/
@Test(timeout = 120 * 1000)
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 33a92e3..51793e5 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -87,11 +87,11 @@ under the License.
files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
checks="AvoidStarImport|UnusedImports"/>
<suppress
- files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+ files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...]
<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
<suppress
- files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+ files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
checks="AvoidStarImport|UnusedImports"/>
<!--Test class copied from the netty project-->
<suppress