You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/04/26 10:07:30 UTC
[08/23] flink git commit: [FLINK-6107] Enable Javadoc checks in
streaming checkstyle
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 57beb93..5072374 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -30,6 +30,9 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import java.util.ArrayList;
/**
+ * Special window operator implementation for windows that fire at the same time for all keys with
+ * accumulating windows.
+ *
* @deprecated Deprecated in favour of the generic {@link WindowOperator}. This was an
* optimized implementation used for aligned windows.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index 84686a7..d26ab9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -25,6 +25,9 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
+/**
+ * Key/value map organized in panes for aggregating windows (with a reduce function).
+ */
@Internal
public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
@@ -33,7 +36,9 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
private final ReduceFunction<Type> reducer;
/**
- * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
+ * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries
+ * have (zero).
+ */
private long evaluationPass = 1L;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 1be9650..51803e6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
/**
+ * Special window operator implementation for windows that fire at the same time for all keys with
+ * aggregating windows.
+ *
* @deprecated Deprecated in favour of the generic {@link WindowOperator}. This was an
* optimized implementation used for aligned windows.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 85451a5..0d590c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -48,9 +48,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link WindowOperator} that also allows an {@link Evictor} to be used.
*
- * <p>
- * The {@code Evictor} is used to remove elements from a pane before/after the evaluation of WindowFunction and
- * after the window evaluation gets triggered by a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ * <p>The {@code Evictor} is used to remove elements from a pane before/after the evaluation of
+ * {@link InternalWindowFunction} and after the window evaluation gets triggered by a
+ * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
*
* @param <K> The type of key returned by the {@code KeySelector}.
* @param <IN> The type of the incoming elements.
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
index b0f4f46..4e50d82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
@@ -30,46 +30,48 @@ import java.util.NoSuchElementException;
/**
* A special Hash Map implementation that can be traversed efficiently in sync with other
* hash maps.
- * <p>
- * The differences between this hash map and Java's "java.util.HashMap" are:
+ *
+ * <p>The differences between this hash map and Java's "java.util.HashMap" are:
* <ul>
* <li>A different hashing scheme. This implementation uses extensible hashing, meaning that
* each hash table growth takes one more lower hash code bit into account, and values that where
- * formerly in the same bucket will afterwards be in the two adjacent buckets.</li>
+ * formerly in the same bucket will afterwards be in the two adjacent buckets.
* <li>This allows an efficient traversal of multiple hash maps together, even though the maps are
* of different sizes.</li>
- * <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li>
- * <li>The map supports no removal/shrinking.</li>
+ * <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"
+ * <li>The map supports no removal/shrinking.
* </ul>
*/
@Internal
public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
- /** The minimum table capacity, 64 entries */
+ /** The minimum table capacity, 64 entries. */
private static final int MIN_CAPACITY = 0x40;
- /** The maximum possible table capacity, the largest positive power of
- * two in the 32bit signed integer value range */
+ /**
+ * The maximum possible table capacity, the largest positive power of two in the 32bit signed
+ * integer value range.
+ */
private static final int MAX_CAPACITY = 0x40000000;
- /** The number of bits used for table addressing when table is at max capacity */
+ /** The number of bits used for table addressing when table is at max capacity. */
private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY);
// ------------------------------------------------------------------------
- /** The hash index, as an array of entries */
+ /** The hash index, as an array of entries. */
private Entry<K, V>[] table;
- /** The number of bits by which the hash code is shifted right, to find the bucket */
+ /** The number of bits by which the hash code is shifted right, to find the bucket. */
private int shift;
- /** The number of elements in the hash table */
+ /** The number of elements in the hash table. */
private int numElements;
- /** The number of elements above which the hash table needs to grow */
+ /** The number of elements above which the hash table needs to grow. */
private int rehashThreshold;
- /** The base-2 logarithm of the table capacity */
+ /** The base-2 logarithm of the table capacity. */
private int log2size;
// ------------------------------------------------------------------------
@@ -141,8 +143,8 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
/**
* Inserts a value for the given key, if no value is yet contained for that key. Otherwise,
* returns the value currently contained for the key.
- * <p>
- * The value that is inserted in case that the key is not contained, yet, is lazily created
+ *
+ * <p>The value that is inserted in case that the key is not contained, yet, is lazily created
* using the given factory.
*
* @param key The key to insert.
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
index 9c63a69..108162b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,13 +28,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@PublicEvolving
public class TimestampedValue<T> {
- /** The actual value held by this record */
+ /** The actual value held by this record. */
private T value;
- /** The timestamp of the record */
+ /** The timestamp of the record. */
private long timestamp;
- /** Flag whether the timestamp is actually set */
+ /** Flag whether the timestamp is actually set. */
private boolean hasTimestamp;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 7069251..e36d485 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -88,15 +88,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* An operator that implements the logic for windowing based on a {@link WindowAssigner} and
* {@link Trigger}.
*
- * <p>
- * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
+ * <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
* assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
* is put into panes. A pane is the bucket of elements that have the same key and same
* {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
* {@code WindowAssigner}.
*
- * <p>
- * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
+ * <p>Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
* the contents of the pane should be processed to emit results. When a trigger fires,
* the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
* the pane to which the {@code Trigger} belongs.
@@ -161,10 +159,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
private transient InternalMergingState<W, IN, ACC> windowMergingState;
- /** The state that holds the merging window metadata (the sets that describe what is merged) */
+ /** The state that holds the merging window metadata (the sets that describe what is merged). */
private transient InternalListState<VoidNamespace, Tuple2<W, W>> mergingSetsState;
- /** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
+ /**
+ * This is given to the {@code InternalWindowFunction} for emitting elements with a given
+ * timestamp.
+ */
protected transient TimestampedCollector<OUT> timestampedCollector;
protected transient Context triggerContext = new Context(null, null);
@@ -599,7 +600,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
/**
- * Write skipped late arriving element to SideOutput
+ * Write skipped late arriving element to SideOutput.
*
* @param element skipped late arriving element to side output
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 9834480..4055c4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -49,6 +49,11 @@ public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends
*/
void clear(W window, InternalWindowContext context) throws Exception;
+ /**
+ * A context for {@link InternalWindowFunction}, similar to
+ * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context} but
+ * for internal use.
+ */
interface InternalWindowContext extends java.io.Serializable {
KeyedStateStore windowState();
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
index c59c88a..d27246e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
@@ -23,7 +23,7 @@ package org.apache.flink.streaming.runtime.partitioner;
* of the stream transformation. The configure method is called by the StreamGraph when adding
* internal edges.
*
- * This interface is required since the stream partitioners are instantiated eagerly. Due to that
+ * <p>This interface is required since the stream partitioners are instantiated eagerly. Due to that
* the maximum parallelism might not have been determined and needs to be set at a stage when the
* maximum parallelism could have been determined.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
index bc77d54..67eaa73 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
- * Partitioner that sends all elements to the downstream operator with subtask ID=0;
+ * Partitioner that sends all elements to the downstream operator with subtask ID=0.
*
* @param <T> Type of the elements in the Stream being partitioned
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index 6b4a810..415a3c7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -23,6 +23,9 @@ import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+/**
+ * A special {@link ChannelSelector} for use in streaming programs.
+ */
@Internal
public abstract class StreamPartitioner<T> implements
ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
index 714bdae..131658d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -24,7 +24,7 @@ import org.apache.flink.annotation.PublicEvolving;
* Special record type carrying a timestamp of its creation time at a source operator
* and the vertexId and subtask index of the operator.
*
- * At sinks, the marker can be used to approximate the time a record needs to travel
+ * <p>At sinks, the marker can be used to approximate the time a record needs to travel
* through the dataflow.
*/
@PublicEvolving
@@ -32,7 +32,7 @@ public final class LatencyMarker extends StreamElement {
// ------------------------------------------------------------------------
- /** The time the latency mark is denoting */
+ /** The time the latency mark is denoting. */
private final long markedTime;
private final int vertexID;
@@ -40,7 +40,7 @@ public final class LatencyMarker extends StreamElement {
private final int subtaskIndex;
/**
- * Creates a latency mark with the given timestamp
+ * Creates a latency mark with the given timestamp.
*/
public LatencyMarker(long markedTime, int vertexID, int subtaskIndex) {
this.markedTime = markedTime;
@@ -49,7 +49,7 @@ public final class LatencyMarker extends StreamElement {
}
/**
- * Returns the timestamp marked by the LatencyMarker
+ * Returns the timestamp marked by the LatencyMarker.
*/
public long getMarkedTime() {
return markedTime;
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 3db649a..f2b8aec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -30,10 +30,10 @@ import java.io.IOException;
import static java.util.Objects.requireNonNull;
/**
- * Serializer for {@link StreamRecord}, {@link Watermark}, {@link LatencyMarker}, and {@link StreamStatus}.
+ * Serializer for {@link StreamRecord}, {@link Watermark}, {@link LatencyMarker}, and
+ * {@link StreamStatus}.
*
- * <p>
- * This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the
+ * <p>This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the
* stream task/operator level for transmitting StreamRecords and Watermarks.
*
* @param <T> The type of value in the StreamRecord
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index da606a9..0395897 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -27,13 +27,13 @@ import org.apache.flink.annotation.Internal;
@Internal
public final class StreamRecord<T> extends StreamElement {
- /** The actual value held by this record */
+ /** The actual value held by this record. */
private T value;
- /** The timestamp of the record */
+ /** The timestamp of the record. */
private long timestamp;
- /** Flag whether the timestamp is actually set */
+ /** Flag whether the timestamp is actually set. */
private boolean hasTimestamp;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
index f17d240..1327c83 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java
@@ -49,13 +49,16 @@ public class StatusWatermarkValve {
// Runtime state for watermark & stream status output determination
// ------------------------------------------------------------------------
- /** Array of current status of all input channels. Changes as watermarks & stream statuses are fed into the valve */
+ /**
+ * Array of current status of all input channels. Changes as watermarks & stream statuses are
+ * fed into the valve.
+ */
private final InputChannelStatus[] channelStatuses;
- /** The last watermark emitted from the valve */
+ /** The last watermark emitted from the valve. */
private long lastOutputWatermark;
- /** The last stream status emitted from the valve */
+ /** The last stream status emitted from the valve. */
private StreamStatus lastOutputStreamStatus;
/**
@@ -172,20 +175,25 @@ public class StatusWatermarkValve {
}
/**
- * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream status, and whether or not
- * the channel's current watermark is aligned with the overall watermark output from the valve.
+ * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream
+ * status, and whether or not the channel's current watermark is aligned with the overall
+ * watermark output from the valve.
*
- * There are 2 situations where a channel's watermark is not considered aligned:
- * - the current stream status of the channel is idle
- * - the stream status has resumed to be active, but the watermark of the channel hasn't caught up to the
- * last output watermark from the valve yet.
+ * <p>There are 2 situations where a channel's watermark is not considered aligned:
+ * <ul>
+ * <li>the current stream status of the channel is idle
+ * <li>the stream status has resumed to be active, but the watermark of the channel hasn't
+ * caught up to thelast output watermark from the valve yet.
+ * </ul>
*/
private static class InputChannelStatus {
private long watermark;
private StreamStatus streamStatus;
private boolean isWatermarkAligned;
- /** Utility to check if at least one channel in a given array of input channels is active */
+ /**
+ * Utility to check if at least one channel in a given array of input channels is active.
+ */
private static boolean hasActiveChannels(InputChannelStatus[] channelStatuses) {
for (InputChannelStatus status : channelStatuses) {
if (status.streamStatus.isActive()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
index e82fad0..8e58340 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
@@ -52,8 +52,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
* {@link StreamStatus#ACTIVE}, the task is active.</li>
* </ul>
*
- * <p>
- * Stream Status elements received at downstream tasks also affect and control how their operators process and advance
+ * <p>Stream Status elements received at downstream tasks also affect and control how their operators process and advance
* their watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which
* downstream tasks should use for such purposes):
*
@@ -64,16 +63,15 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
* active. However, for watermarks, since there may be watermark generators that might produce watermarks
* anywhere in the middle of topologies regardless of whether there are input data at the operator, the current
* status of the task must be checked before forwarding watermarks emitted from
- * an operator. It the status is actually idle, the watermark must be blocked.</li>
+ * an operator. It the status is actually idle, the watermark must be blocked.
*
* <li>For downstream tasks with multiple input streams, the watermarks of input streams that are temporarily idle,
* or has resumed to be active but its watermark is behind the overall min watermark of the operator, should not
* be accounted for when deciding whether or not to advance the watermark and propagated through the operator
- * chain.</li>
+ * chain.
* </ul>
*
- * <p>
- * Note that to notify downstream tasks that a source task is permanently closed and will no longer send any more
+ * <p>Note that to notify downstream tasks that a source task is permanently closed and will no longer send any more
* elements, the source should still send a {@link Watermark#MAX_WATERMARK} instead of {@link StreamStatus#IDLE}.
* Stream Status elements only serve as markers for temporary status.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
index cda0511..39d3ed2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index e04d316..0343b54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -25,6 +25,9 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+/**
+ * A {@link StreamTask} for executing a {@link OneInputStreamOperator}.
+ */
@Internal
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index be4b456..aced2f4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -203,7 +203,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
/**
- *
* This method should be called before finishing the record emission, to make sure any data
* that is still buffered will be sent. It also ensures that all data sending related
* exceptions are recognized.
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
index 7588c3d..1164ea0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -29,10 +29,10 @@ public interface ProcessingTimeCallback {
/**
* This method is invoked with the timestamp for which the trigger was scheduled.
- * <p>
- * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due
- * to a garbage collection), the timestamp supplied to this function will still be the original
- * timestamp for which the trigger was scheduled.
+ *
+ * <p>If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled
+ * due to a garbage collection), the timestamp supplied to this function will still be the
+ * original timestamp for which the trigger was scheduled.
*
* @param timestamp The timestamp for which the trigger event was scheduled.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1829140..a3913ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -23,9 +23,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
/**
- * Task for executing streaming sources.
+ * {@link StreamTask} for executing a {@link StreamSource}.
*
- * One important aspect of this is that the checkpointing and the emission of elements must never
+ * <p>One important aspect of this is that the checkpointing and the emission of elements must never
* occur at the same time. The execution must be serial. This is achieved by having the contract
* with the StreamFunction that it must only modify its state or emit elements in
* a synchronized block that locks on the lock Object. Also, the modification of the state
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 59edb8f..d05ec37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -30,6 +30,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A special {@link StreamTask} that is used for executing feedback edges. This is used in
+ * combination with {@link StreamIterationTail}.
+ */
@Internal
public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index e40f834..7326a80 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -32,6 +32,10 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A special {@link StreamTask} that is used for executing feedback edges. This is used in
+ * combination with {@link StreamIterationHead}.
+ */
@Internal
public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7de6c1a..e73c825 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -87,7 +87,7 @@ import java.util.concurrent.atomic.AtomicReference;
* produced by the operators at the ends of the operator chain. Note that the chain may fork and
* thus have multiple ends.
*
- * The life cycle of the task is set up as follows:
+ * <p>The life cycle of the task is set up as follows:
* <pre>{@code
* -- setInitialState -> provides state of all operators in the chain
*
@@ -105,7 +105,7 @@ import java.util.concurrent.atomic.AtomicReference;
* +----> task specific cleanup()
* }</pre>
*
- * <p> The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
+ * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
* {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
* are called concurrently.
*
@@ -117,27 +117,27 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements StatefulTask, AsyncExceptionHandler {
- /** The thread group that holds all trigger timer threads */
+ /** The thread group that holds all trigger timer threads. */
public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
- /** The logger used by the StreamTask and its subclasses */
+ /** The logger used by the StreamTask and its subclasses. */
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
// ------------------------------------------------------------------------
/**
- * All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that
- * we don't have concurrent method calls that void consistent checkpoints.
+ * All interaction with the {@code StreamOperator} must be synchronized on this lock object to
+ * ensure that we don't have concurrent method calls that void consistent checkpoints.
*/
private final Object lock = new Object();
- /** the head operator that consumes the input streams of this task */
+ /** the head operator that consumes the input streams of this task. */
protected OP headOperator;
- /** The chain of operators executed by this task */
+ /** The chain of operators executed by this task. */
protected OperatorChain<OUT, OP> operatorChain;
- /** The configuration of this streaming task */
+ /** The configuration of this streaming task. */
private StreamConfig configuration;
/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
@@ -153,22 +153,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
*/
private ProcessingTimeService timerService;
- /** The map of user-defined accumulators of this task */
+ /** The map of user-defined accumulators of this task. */
private Map<String, Accumulator<?, ?>> accumulatorMap;
private TaskStateHandles restoreStateHandles;
- /** The currently active background materialization threads */
+ /** The currently active background materialization threads. */
private final CloseableRegistry cancelables = new CloseableRegistry();
- /** Flag to mark the task "in operation", in which case check
- * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
+ /**
+ * Flag to mark the task "in operation", in which case check needs to be initialized to true,
+ * so that early cancel() before invoke() behaves correctly.
+ */
private volatile boolean isRunning;
- /** Flag to mark this task as canceled */
+ /** Flag to mark this task as canceled. */
private volatile boolean canceled;
- /** Thread pool for async snapshot workers */
+ /** Thread pool for async snapshot workers. */
private ExecutorService asyncOperationsThreadPool;
// ------------------------------------------------------------------------
@@ -363,11 +365,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
/**
- * Execute the operator-specific {@link StreamOperator#open()} method in each
- * of the operators in the chain of this {@link StreamTask}. </b> Opening happens
- * from <b>tail to head</b> operator in the chain, contrary to
- * {@link StreamOperator#close()} which happens <b>head to tail</b>
- * operator (see {@link #closeAllOperators()}.
+ * Execute {@link StreamOperator#open()} of each operator in the chain of this
+ * {@link StreamTask}. Opening happens from <b>tail to head</b> operator in the chain, contrary
+ * to {@link StreamOperator#close()} which happens <b>head to tail</b>
+ * (see {@link #closeAllOperators()}.
*/
private void openAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
@@ -378,10 +379,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
/**
- * Execute the operator-specific {@link StreamOperator#close()} method in each
- * of the operators in the chain of this {@link StreamTask}. </b> Closing happens
- * from <b>head to tail</b> operator in the chain, contrary to
- * {@link StreamOperator#open()} which happens <b>tail to head</b> operator
+ * Execute {@link StreamOperator#close()} of each operator in the chain of this
+ * {@link StreamTask}. Closing happens from <b>head to tail</b> operator in the chain,
+ * contrary to {@link StreamOperator#open()} which happens <b>tail to head</b>
* (see {@link #openAllOperators()}.
*/
private void closeAllOperators() throws Exception {
@@ -397,9 +397,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
/**
- * Execute the operator-specific {@link StreamOperator#dispose()} method in each
- * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
- * from <b>tail to head</b> operator in the chain.
+ * Execute {@link StreamOperator#dispose()} of each operator in the chain of this
+ * {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
*/
private void tryDisposeAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
@@ -416,11 +415,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
/**
- * Execute the operator-specific {@link StreamOperator#dispose()} method in each
- * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
- * from <b>tail to head</b> operator in the chain.
+ * Execute @link StreamOperator#dispose()} of each operator in the chain of this
+ * {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
*
- * The difference with the {@link #tryDisposeAllOperators()} is that in case of an
+ * <p>The difference with the {@link #tryDisposeAllOperators()} is that in case of an
* exception, this method catches it and logs the message.
*/
private void disposeAllOperators() {
@@ -442,8 +440,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
* The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
* shutdown method was never called.
*
- * <p>
- * This should not be relied upon! It will cause shutdown to happen much later than if manual
+ * <p>This should not be relied upon! It will cause shutdown to happen much later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
@@ -818,7 +815,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
* Handles an exception thrown by another thread (e.g. a TriggerTask),
* other than the one executing the main task by failing the task entirely.
*
- * In more detail, it marks task execution failed for an external reason
+ * <p>In more detail, it marks task execution failed for an external reason
* (a reason other than the task code itself throwing an exception). If the task
* is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the
* task is already canceling this does nothing. Otherwise it sets the state to
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index abcb19b..4c2d731 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -49,10 +49,10 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
/** The containing task that owns this time service provider. */
private final AsyncExceptionHandler task;
- /** The lock that timers acquire upon triggering */
+ /** The lock that timers acquire upon triggering. */
private final Object checkpointLock;
- /** The executor service that schedules and calls the triggers of this task*/
+ /** The executor service that schedules and calls the triggers of this task. */
private final ScheduledThreadPoolExecutor timerService;
private final AtomicInteger status;
@@ -92,7 +92,8 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
}
/**
- * Registers a task to be executed no sooner than time {@code timestamp}, but without strong guarantees of order
+ * Registers a task to be executed no sooner than time {@code timestamp}, but without strong
+ * guarantees of order.
*
* @param timestamp Time when the task is to be enabled (in processing time)
* @param target The task to be executed
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
index 2e39725..0bd78ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 71346b8..50aefc8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -28,6 +28,9 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
+/**
+ * A {@link StreamTask} for executing a {@link TwoInputStreamOperator}.
+ */
@Internal
public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
index a40ae3a..e5b7908 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
@@ -20,8 +20,8 @@
* executable stream consumers and producers that are scheduled by the distributed
* dataflow runtime. Each task occupies one execution slot and is run with by an
* executing thread.
- * <p>
- * The tasks merely set up the distributed stream coordination and the checkpointing.
+ *
+ * <p>The tasks merely set up the distributed stream coordination and the checkpointing.
* Internally, the tasks create one or more operators, perform the stream transformations.
*/
package org.apache.flink.streaming.runtime.tasks;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index ac9be39..e81a1c7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -38,18 +38,20 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
private static final long serialVersionUID = -5359448468131559102L;
- /** The serializer for the actual de-/serialization */
+ /** The serializer for the actual de-/serialization. */
private final TypeSerializer<T> serializer;
- /** The reusable output serialization buffer */
+ /** The reusable output serialization buffer. */
private transient DataOutputSerializer dos;
- /** The reusable input deserialization buffer */
+ /** The reusable input deserialization buffer. */
private transient DataInputDeserializer dis;
- /** The type information, to be returned by {@link #getProducedType()}. It is
- * transient, because it is not serializable. Note that this means that the type information
- * is not available at runtime, but only prior to the first serialization / deserialization */
+ /**
+ * The type information, to be returned by {@link #getProducedType()}. It is transient, because
+ * it is not serializable. Note that this means that the type information is not available at
+ * runtime, but only prior to the first serialization / deserialization.
+ */
private transient TypeInformation<T> typeInfo;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
index 21f477a..aef87d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
@@ -41,12 +41,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* These classes encapsulate the logic of accessing a field specified by the user as either an index
- * or a field expression string. TypeInformation can also be requested for the field.
- * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field").
+ * or a field expression string. TypeInformation can also be requested for the field. The position
+ * index might specify a field of a Tuple, an array, or a simple type(only "0th field").
*
- * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors.
- * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor".
- * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, RecursiveProductFieldAccessor)
+ * <p>Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field
+ * accessors. These penetrate one layer, and then delegate the rest of the work to an
+ * "innerAccesor". (see PojoFieldAccessor, RecursiveTupleFieldAccessor,
+ * RecursiveProductFieldAccessor)
*/
@Internal
public abstract class FieldAccessor<T, F> implements Serializable {
@@ -75,7 +76,7 @@ public abstract class FieldAccessor<T, F> implements Serializable {
/**
* Sets the field (specified in the constructor) of the given record to the given value.
*
- * Warning: This might modify the original object, or might return a new object instance.
+ * <p>Warning: This might modify the original object, or might return a new object instance.
* (This is necessary, because the record might be immutable.)
*
* @param record The record to modify
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index 51e995b..28cdf9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index e9d63de..e5d3ef0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,7 +27,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -546,7 +547,11 @@ public class EvictingWindowOperatorTest {
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
- new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
+ new InternalIterableWindowFunction<>(
+ new ReduceApplyWindowFunction<>(
+ new SumReducer(),
+ // on some versions of Java we seem to need the explicit type
+ new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE),
0,
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
index 9633671..341171d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index d86c809..a21e58b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
index ab9e59b..3bd7adf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 3676953..9e26f9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index ff07fa2..a9d2ddf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 2167652..7ce4ab7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
index 9820ef8..1745c46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
index bcb5691..05de3d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 4b6925d..b46ea66 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 90bdcb2..7d791c0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/tools/maven/strict-checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/strict-checkstyle.xml b/tools/maven/strict-checkstyle.xml
index ba2f4be..3bf9863 100644
--- a/tools/maven/strict-checkstyle.xml
+++ b/tools/maven/strict-checkstyle.xml
@@ -251,30 +251,30 @@ This file is based on the checkstyle file of Apache Beam.
<!-- Checks for Javadoc comments. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html -->
- <!--<module name="JavadocMethod">-->
- <!--<property name="scope" value="protected"/>-->
- <!--<property name="severity" value="error"/>-->
- <!--<property name="allowMissingJavadoc" value="true"/>-->
- <!--<property name="allowMissingParamTags" value="true"/>-->
- <!--<property name="allowMissingReturnTag" value="true"/>-->
- <!--<property name="allowMissingThrowsTags" value="true"/>-->
- <!--<property name="allowThrowsTagsForSubclasses" value="true"/>-->
- <!--<property name="allowUndeclaredRTE" value="true"/>-->
- <!--</module>-->
+ <module name="JavadocMethod">
+ <property name="scope" value="protected"/>
+ <property name="severity" value="error"/>
+ <property name="allowMissingJavadoc" value="true"/>
+ <property name="allowMissingParamTags" value="true"/>
+ <property name="allowMissingReturnTag" value="true"/>
+ <property name="allowMissingThrowsTags" value="true"/>
+ <property name="allowThrowsTagsForSubclasses" value="true"/>
+ <property name="allowUndeclaredRTE" value="true"/>
+ </module>
- <!--<!– Check that paragraph tags are used correctly in Javadoc. –>-->
- <!--<module name="JavadocParagraph"/>-->
+ <!-- Check that paragraph tags are used correctly in Javadoc. -->
+ <module name="JavadocParagraph"/>
- <!--<module name="JavadocType">-->
- <!--<property name="scope" value="protected"/>-->
- <!--<property name="severity" value="error"/>-->
- <!--<property name="allowMissingParamTags" value="true"/>-->
- <!--</module>-->
+ <module name="JavadocType">
+ <property name="scope" value="protected"/>
+ <property name="severity" value="error"/>
+ <property name="allowMissingParamTags" value="true"/>
+ </module>
- <!--<module name="JavadocStyle">-->
- <!--<property name="severity" value="error"/>-->
- <!--<property name="checkHtml" value="true"/>-->
- <!--</module>-->
+ <module name="JavadocStyle">
+ <property name="severity" value="error"/>
+ <property name="checkHtml" value="true"/>
+ </module>
<!--