You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/18 08:08:24 UTC
[4/5] flink git commit: [hotfix] [core] Fix checkstyle in
org.apache.flink.api.common.functions
[hotfix] [core] Fix checkstyle in org.apache.flink.api.common.functions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9644df74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9644df74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9644df74
Branch: refs/heads/master
Commit: 9644df746e386e4be21d9786d3a98427c0b79e73
Parents: f41debe
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 16:59:55 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 18 10:05:39 2018 +0200
----------------------------------------------------------------------
.../common/functions/AbstractRichFunction.java | 16 +--
.../api/common/functions/AggregateFunction.java | 42 ++++----
.../functions/BroadcastVariableInitializer.java | 26 ++---
.../api/common/functions/CoGroupFunction.java | 30 +++---
.../api/common/functions/CombineFunction.java | 14 +--
.../api/common/functions/CrossFunction.java | 20 ++--
.../api/common/functions/FilterFunction.java | 22 ++--
.../api/common/functions/FlatJoinFunction.java | 34 +++---
.../api/common/functions/FlatMapFunction.java | 8 +-
.../api/common/functions/FoldFunction.java | 9 +-
.../flink/api/common/functions/Function.java | 2 +-
.../common/functions/GroupCombineFunction.java | 14 +--
.../common/functions/GroupReduceFunction.java | 24 ++---
.../common/functions/InvalidTypesException.java | 6 +-
.../functions/IterationRuntimeContext.java | 12 ++-
.../api/common/functions/JoinFunction.java | 28 ++---
.../flink/api/common/functions/MapFunction.java | 8 +-
.../common/functions/MapPartitionFunction.java | 19 ++--
.../api/common/functions/ReduceFunction.java | 16 +--
.../common/functions/RichAggregateFunction.java | 4 +-
.../common/functions/RichCoGroupFunction.java | 4 +-
.../api/common/functions/RichCrossFunction.java | 4 +-
.../common/functions/RichFilterFunction.java | 6 +-
.../common/functions/RichFlatJoinFunction.java | 2 +-
.../common/functions/RichFlatMapFunction.java | 2 +-
.../api/common/functions/RichFunction.java | 50 ++++-----
.../functions/RichGroupCombineFunction.java | 1 -
.../functions/RichGroupReduceFunction.java | 10 +-
.../api/common/functions/RichJoinFunction.java | 2 +-
.../api/common/functions/RichMapFunction.java | 4 +-
.../functions/RichMapPartitionFunction.java | 4 +-
.../common/functions/RichReduceFunction.java | 4 +-
.../api/common/functions/RuntimeContext.java | 42 ++++----
.../api/common/functions/StoppableFunction.java | 13 +--
.../util/AbstractRuntimeUDFContext.java | 18 ++--
.../common/functions/util/CopyingIterator.java | 13 ++-
.../functions/util/CopyingListCollector.java | 9 +-
.../common/functions/util/ListCollector.java | 6 +-
.../api/common/functions/util/NoOpFunction.java | 3 +
.../functions/util/RuntimeUDFContext.java | 32 +++---
.../functions/util/RuntimeUDFContextTest.java | 104 ++++++++++---------
tools/maven/suppressions-core.xml | 4 -
42 files changed, 355 insertions(+), 336 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index ff2cbea..f13e584 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -18,11 +18,11 @@ c * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flink.api.common.functions;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;
+import java.io.Serializable;
+
/**
* An abstract stub implementation for rich user-defined functions.
* Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
@@ -31,13 +31,13 @@ import org.apache.flink.configuration.Configuration;
*/
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
-
+
private static final long serialVersionUID = 1L;
-
+
// --------------------------------------------------------------------------------------------
// Runtime context access
// --------------------------------------------------------------------------------------------
-
+
private transient RuntimeContext runtimeContext;
@Override
@@ -53,7 +53,7 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
throw new IllegalStateException("The runtime context has not been initialized.");
}
}
-
+
@Override
public IterationRuntimeContext getIterationRuntimeContext() {
if (this.runtimeContext == null) {
@@ -64,11 +64,11 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
throw new IllegalStateException("This stub is not part of an iteration step function.");
}
}
-
+
// --------------------------------------------------------------------------------------------
// Default life cycle methods
// --------------------------------------------------------------------------------------------
-
+
@Override
public void open(Configuration parameters) throws Exception {}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
index 78b8d94..6ce20d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -25,11 +25,11 @@ import java.io.Serializable;
/**
* The {@code AggregateFunction} is a flexible aggregation function, characterized by the
* following features:
- *
+ *
* <ul>
* <li>The aggregates may use different types for input values, intermediate aggregates,
* and result type, to support a wide range of aggregation types.</li>
- *
+ *
* <li>Support for distributive aggregations: Different intermediate aggregates can be
* merged together, to allow for pre-aggregation/final-aggregation optimizations.</li>
* </ul>
@@ -39,47 +39,47 @@ import java.io.Serializable;
* obtained by finalizing the accumulator state. This supports aggregation functions where the
* intermediate state needs to be different than the aggregated values and the final result type,
* such as for example <i>average</i> (which typically keeps a count and sum).
- * Merging intermediate aggregates (partial aggregates) means merging the accumulators.
- *
+ * Merging intermediate aggregates (partial aggregates) means merging the accumulators.
+ *
* <p>The AggregationFunction itself is stateless. To allow a single AggregationFunction
* instance to maintain multiple aggregates (such as one aggregate per key), the
* AggregationFunction creates a new accumulator whenever a new aggregation is started.
- *
+ *
* <p>Aggregation functions must be {@link Serializable} because they are sent around
* between distributed processes during distributed execution.
- *
+ *
* <h1>Example: Average and Weighted Average</h1>
- *
+ *
* <pre>{@code
* // the accumulator, which holds the state of the in-flight aggregate
* public class AverageAccumulator {
* long count;
* long sum;
* }
- *
+ *
* // implementation of an aggregation function for an 'average'
* public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
- *
+ *
* public AverageAccumulator createAccumulator() {
* return new AverageAccumulator();
* }
- *
+ *
* public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
* a.count += b.count;
* a.sum += b.sum;
* return a;
* }
- *
+ *
* public void add(Integer value, AverageAccumulator acc) {
* acc.sum += value;
* acc.count++;
* }
- *
+ *
* public Double getResult(AverageAccumulator acc) {
* return acc.sum / (double) acc.count;
* }
* }
- *
+ *
* // implementation of a weighted average
* // this reuses the same accumulator type as the aggregate function for 'average'
* public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
@@ -104,7 +104,7 @@ import java.io.Serializable;
* }
* }
* }</pre>
- *
+ *
* @param <IN> The type of the values that are aggregated (input values)
* @param <ACC> The type of the accumulator (intermediate aggregate state).
* @param <OUT> The type of the aggregated result
@@ -114,10 +114,10 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
/**
* Creates a new accumulator, starting a new aggregate.
- *
+ *
* <p>The new accumulator is typically meaningless unless a value is added
* via {@link #add(Object, Object)}.
- *
+ *
* <p>The accumulator is the state of a running aggregation. When a program has multiple
* aggregates in progress (such as per key and window), the state (per key and window)
* is the size of the accumulator.
@@ -131,7 +131,7 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
* new accumulator value.
*
* <p>For efficiency, the input accumulator may be modified and returned.
- *
+ *
* @param value The value to add
* @param accumulator The accumulator to add the value to
*/
@@ -139,7 +139,7 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
/**
* Gets the result of the aggregation from the accumulator.
- *
+ *
* @param accumulator The accumulator of the aggregation
* @return The final aggregation result.
*/
@@ -147,14 +147,14 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
/**
* Merges two accumulators, returning an accumulator with the merged state.
- *
+ *
* <p>This function may reuse any of the given accumulators as the target for the merge
* and return that. The assumption is that the given accumulators will not be used any
* more after having been passed to this function.
- *
+ *
* @param a An accumulator to merge
* @param b Another accumulator to merge
- *
+ *
* @return The accumulator with the merged state
*/
ACC merge(ACC a, ACC b);
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
index 65c3c1f..57cdf10 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.Public;
* into another format during initialization. The transformed variable is shared
* among the parallel instances of a function inside one TaskManager, the
* same way as the plain broadcast variables (lists) are shared.
- *
+ *
* <p>The broadcast variable initializer will in many cases only be executed by one
* parallel function instance per TaskManager, when acquiring the broadcast variable
* for the first time inside that particular TaskManager. It is possible that a
@@ -33,10 +33,10 @@ import org.apache.flink.annotation.Public;
* the variables are not overlapping in their execution time; in such cases, it can
* happen that one function instance released the broadcast variable, and another
* function instance materializes it again.
- *
+ *
* <p>This is an example of how to use a broadcast variable initializer, transforming
* the shared list of values into a shared map.
- *
+ *
* <pre>{@code
* public class MyFunction extends RichMapFunction<Long, String> {
*
@@ -45,29 +45,29 @@ import org.apache.flink.annotation.Public;
* public void open(Configuration cfg) throws Exception {
* getRuntimeContext().getBroadcastVariableWithInitializer("mapvar",
* new BroadcastVariableInitializer<Tuple2<Long, String>, Map<Long, String>>() {
- *
+ *
* public Map<Long, String> initializeBroadcastVariable(Iterable<Tuple2<Long, String>> data) {
* Map<Long, String> map = new HashMap<>();
- *
+ *
* for (Tuple2<Long, String> t : data) {
* map.put(t.f0, t.f1);
* }
- *
+ *
* return map;
* }
* });
* }
- *
+ *
* public String map(Long value) {
* // replace the long by the String, based on the map
* return map.get(value);
* }
* }
- *
+ *
* }</pre>
- *
+ *
* @param <T> The type of the elements in the list of the original untransformed broadcast variable.
- * @param <O> The type of the transformed broadcast variable.
+ * @param <O> The type of the transformed broadcast variable.
*/
@Public
public interface BroadcastVariableInitializer<T, O> {
@@ -75,10 +75,10 @@ public interface BroadcastVariableInitializer<T, O> {
/**
* The method that reads the data elements from the broadcast variable and
* creates the transformed data structure.
- *
- * <p>The iterable with the data elements can be traversed only once, i.e.,
+ *
+ * <p>The iterable with the data elements can be traversed only once, i.e.,
* only the first call to {@code iterator()} will succeed.
- *
+ *
* @param data The sequence of elements in the broadcast variable.
* @return The transformed broadcast variable.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index cf2fdc5..ec083aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -18,47 +18,47 @@
package org.apache.flink.api.common.functions;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
+
/**
* The interface for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set
- * after a key and then "joining" the groups by calling this function with the two sets for each key.
+ * after a key and then "joining" the groups by calling this function with the two sets for each key.
* If a key is present in only one of the two inputs, it may be that one of the groups is empty.
- * <p>
- * The basic syntax for using CoGroup on two data sets is as follows:
+ *
+ * <p>The basic syntax for using CoGroup on two data sets is as follows:
* <pre>{@code
* DataSet<X> set1 = ...;
* DataSet<Y> set2 = ...;
- *
+ *
* set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
* }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * <p>Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
* with in empty input for the side of the data set that did not contain elements with that specific key.
- *
+ *
* @param <IN1> The data type of the first input data set.
* @param <IN2> The data type of the second input data set.
* @param <O> The data type of the returned elements.
*/
@Public
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
-
+
/**
* This method must be implemented to provide a user implementation of a
* coGroup. It is called for each pair of element groups where the elements share the
* same key.
- *
+ *
* @param first The records from the first input.
* @param second The records from the second.
* @param out A collector to return elements.
- *
+ *
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
- public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
+ void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index af05c0d..4ff90d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -26,13 +26,13 @@ import java.io.Serializable;
* Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
* and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
* only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ *
+ * <p>Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
* reduce the data volume earlier, before the entire groups have been collected.
- * <p>
- * This special variant of the combine function reduces the group of elements into a single element. A variant
+ *
+ * <p>This special variant of the combine function reduces the group of elements into a single element. A variant
* that can return multiple values per group is defined in {@link GroupCombineFunction}.
- *
+ *
* @param <IN> The data type processed by the combine function.
* @param <OUT> The data type emitted by the combine function.
*/
@@ -41,10 +41,10 @@ public interface CombineFunction<IN, OUT> extends Function, Serializable {
/**
* The combine method, called (potentially multiple timed) with subgroups of elements.
- *
+ *
* @param values The elements to be combined.
* @return The single resulting value.
- *
+ *
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
index e29242f..f4dbe82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -25,20 +25,20 @@ import java.io.Serializable;
/**
* Interface for Cross functions. Cross functions are applied to the Cartesian product
* of their inputs and are called for each pair of elements.
- *
- * They are optional, a means of convenience that can be used to directly manipulate the
+ *
+ * <p>They are optional, a means of convenience that can be used to directly manipulate the
* pair of elements instead of producing 2-tuples containing the pairs.
- * <p>
- * The basic syntax for using Cross on two data sets is as follows:
+ *
+ * <p>The basic syntax for using Cross on two data sets is as follows:
* <pre>{@code
* DataSet<X> set1 = ...;
* DataSet<Y> set2 = ...;
- *
+ *
* set1.cross(set2).with(new MyCrossFunction());
* }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- *
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
* @param <OUT> The type of the result elements.
@@ -48,11 +48,11 @@ public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
/**
* Cross UDF method. Called once per pair of elements in the Cartesian product of the inputs.
- *
+ *
* @param val1 Element from first input.
* @param val2 Element from the second input.
* @return The result element.
- *
+ *
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
index e8e83b6..a97715a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -25,33 +25,33 @@ import java.io.Serializable;
/**
* A filter function is a predicate applied individually to each record.
* The predicate decides whether to keep the element, or to discard it.
- * <p>
- * The basic syntax for using a FilterFunction is as follows:
+ *
+ * <p>The basic syntax for using a FilterFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
- *
+ *
* DataSet<X> result = input.filter(new MyFilterFunction());
* }</pre>
- * <p>
- * <strong>IMPORTANT:</strong> The system assumes that the function does not
+ *
+ * <p><strong>IMPORTANT:</strong> The system assumes that the function does not
* modify the elements on which the predicate is applied. Violating this assumption
* can lead to incorrect results.
- *
+ *
* @param <T> The type of the filtered elements.
*/
@Public
public interface FilterFunction<T> extends Function, Serializable {
-
+
/**
* The filter function that evaluates the predicate.
- * <p>
- * <strong>IMPORTANT:</strong> The system assumes that the function does not
+ *
+ * <p><strong>IMPORTANT:</strong> The system assumes that the function does not
* modify the elements on which the predicate is applied. Violating this assumption
* can lead to incorrect results.
- *
+ *
* @param value The value to be filtered.
* @return True for values that should be retained, false for values to be filtered out.
- *
+ *
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
index 5d7c0ee..f6ae8b3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -26,30 +26,30 @@ import java.io.Serializable;
/**
* Interface for Join functions. Joins combine two data sets by joining their
* elements on specified keys. This function is called with each pair of joining elements.
- * <p>
- * This particular variant of the join function supports to return zero, one, or more result values
- * per pair of joining values.
- * <p>
- * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ *
+ * <p>This particular variant of the join function supports to return zero, one, or more result values
+ * per pair of joining values.
+ *
+ * <p>By default, the joins follows strictly the semantics of an "inner join" in SQL.
* the semantics are those of an "inner join", meaning that elements are filtered out
* if their key is not contained in the other data set.
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
+ *
+ * <p>The basic syntax for using Join on two data sets is as follows:
* <pre>{@code
* DataSet<X> set1 = ...;
* DataSet<Y> set2 = ...;
- *
+ *
* set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
* }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * <p>The Join function is an optional part of a join operation. If no JoinFunction is provided,
* the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
* the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link CoGroupFunction} to perform an outer join.
- *
+ *
+ * <p>Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ *
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
* @param <OUT> The type of the result elements.
@@ -59,11 +59,11 @@ public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable
/**
* The join method, called once per joined pair of elements.
- *
+ *
* @param first The element from first input.
* @param second The element from second input.
* @param out The collector used to return zero, one, or more elements.
- *
+ *
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index 8b4a8f2..a5211e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -28,14 +28,14 @@ import java.io.Serializable;
* into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
* and arrays. Operations that produce multiple strictly one result element per input element can also
* use the {@link MapFunction}.
- * <p>
- * The basic syntax for using a FlatMapFunction is as follows:
+ *
+ * <p>The basic syntax for using a FlatMapFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
- *
+ *
* DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
* }</pre>
- *
+ *
* @param <T> Type of the input elements.
* @param <O> Type of the returned elements.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index b3fd700..5e41fdb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -25,16 +25,16 @@ import java.io.Serializable;
/**
* Base interface for Fold functions. Fold functions combine groups of elements to
* a single value, by applying a binary operation to an initial accumulator element every element from a group elements.
- * <p>
- * The basic syntax for using a FoldFunction is as follows:
+ *
+ * <p>The basic syntax for using a FoldFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
*
* X initialValue = ...;
* DataSet<X> result = input.fold(new MyFoldFunction(), initialValue);
* }</pre>
- * <p>
- * Like all functions, the FoldFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
+ * <p>Like all functions, the FoldFunction needs to be serializable, as defined in {@link java.io.Serializable}.
*
* @param <T> Type of the initial input and the returned element
* @param <O> Type of the elements that the group/list/stream contains
@@ -44,6 +44,7 @@ import java.io.Serializable;
@Public
@Deprecated
public interface FoldFunction<O, T> extends Function, Serializable {
+
/**
* The core method of FoldFunction, combining two values into one value of the same type.
* The fold function is consecutively applied to all values of a group until only a single value remains.
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index 86c42f8..17ceba0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Public;
/**
* The base interface for all user-defined functions.
- *
+ *
* <p>This interface is empty in order to allow extending interfaces to
* be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p>
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
index 53e7ade..0294506 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
@@ -18,22 +18,22 @@
package org.apache.flink.api.common.functions;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
+
/**
* Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
* and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
* only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ *
+ * <p>Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
* reduce the data volume earlier, before the entire groups have been collected.
- * <p>
- * This special variant of the combine function supports to return more than one element per group.
+ *
+ * <p>This special variant of the combine function supports to return more than one element per group.
* It is frequently less efficient to use than the {@link CombineFunction}.
- *
+ *
* @param <IN> The data type processed by the combine function.
* @param <OUT> The data type emitted by the combine function.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 42b8e0c..7d0dc46 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -18,41 +18,41 @@
package org.apache.flink.api.common.functions;
-import java.io.Serializable;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
+
/**
* The interface for group reduce functions. GroupReduceFunctions process groups of elements.
* They may aggregate them to a single value, or produce multiple result values for each group.
* The group may be defined by sharing a common grouping key, or the group may simply be
* all elements of a data set.
- * <p>
- * For a reduce functions that works incrementally by combining always two elements, see
+ *
+ * <p>For a reduce functions that works incrementally by combining always two elements, see
* {@link ReduceFunction}.
- * <p>
- * The basic syntax for using a grouped GroupReduceFunction is as follows:
+ *
+ * <p>The basic syntax for using a grouped GroupReduceFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
- *
+ *
* DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction());
* }</pre>
*
- * Partial computation can significantly improve the performance of a {@link GroupReduceFunction}.
+ * <p>Partial computation can significantly improve the performance of a {@link GroupReduceFunction}.
* This technique is also known as applying a Combiner.
- * Implement the {@link GroupCombineFunction<T, T>} interface to enable partial computations, i.e.,
+ * Implement the {@link GroupCombineFunction} interface to enable partial computations, i.e.,
* a combiner for this {@link GroupReduceFunction}.
- *
+ *
* @param <T> Type of the elements that this function processes.
* @param <O> The type of the elements returned by the user-defined function.
*/
@Public
public interface GroupReduceFunction<T, O> extends Function, Serializable {
-
+
/**
* The reduce method. The function receives one call per group of elements.
- *
+ *
* @param values All records that belong to the given input key.
* @param out The collector to hand results to.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
index 143180f..ce1fb48 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.InvalidProgramException;
/**
* A special case of the {@link InvalidProgramException}, indicating that the types used in
- * an operation are invalid or inconsistent.
+ * an operation are invalid or inconsistent.
*/
@Public
public class InvalidTypesException extends InvalidProgramException {
@@ -39,13 +39,13 @@ public class InvalidTypesException extends InvalidProgramException {
/**
* Creates a new exception with the given message.
- *
+ *
* @param message The exception message.
*/
public InvalidTypesException(String message) {
super(message);
}
-
+
public InvalidTypesException(String message, Throwable e) {
super(message, e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
index 0019c32..a6cadd2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
@@ -18,23 +18,27 @@
package org.apache.flink.api.common.functions;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.types.Value;
+/**
+ * A specialization of the {@link RuntimeContext} available in iterative computations of the
+ * DataSet API.
+ */
@Public
public interface IterationRuntimeContext extends RuntimeContext {
-
+
/**
* Gets the number of the current superstep. Superstep numbers start at <i>1</i>.
- *
+ *
* @return The number of the current superstep.
*/
int getSuperstepNumber();
@PublicEvolving
<T extends Aggregator<?>> T getIterationAggregator(String name);
-
+
<T extends Value> T getPreviousIterationAggregate(String name);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
index 5897b47..25f2cc6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -25,27 +25,27 @@ import java.io.Serializable;
/**
* Interface for Join functions. Joins combine two data sets by joining their
* elements on specified keys. This function is called with each pair of joining elements.
- * <p>
- * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ *
+ * <p>By default, the joins follows strictly the semantics of an "inner join" in SQL.
* the semantics are those of an "inner join", meaning that elements are filtered out
* if their key is not contained in the other data set.
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
+ *
+ * <p>The basic syntax for using Join on two data sets is as follows:
* <pre>{@code
* DataSet<X> set1 = ...;
* DataSet<Y> set2 = ...;
- *
+ *
* set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
* }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * <p>The Join function is an optional part of a join operation. If no JoinFunction is provided,
* the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
* the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link CoGroupFunction} to perform an outer join.
- *
+ *
+ * <p>Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ *
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
* @param <OUT> The type of the result elements.
@@ -55,11 +55,11 @@ public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
/**
* The join method, called once per joined pair of elements.
- *
+ *
* @param first The element from first input.
* @param second The element from second input.
* @return The resulting element.
- *
+ *
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index 9a0a45a..1017ab2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -28,14 +28,14 @@ import java.io.Serializable;
* Typical applications are parsing elements, converting data types, or projecting out fields.
* Operations that produce multiple result elements from a single input element can be implemented
* using the {@link FlatMapFunction}.
- * <p>
- * The basic syntax for using a MapFunction is as follows:
+ *
+ * <p>The basic syntax for using a MapFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
- *
+ *
* DataSet<Y> result = input.map(new MyMapFunction());
* }</pre>
- *
+ *
* @param <T> Type of the input elements.
* @param <O> Type of the returned elements.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
index 4caaadd..c72c2d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
@@ -25,31 +25,32 @@ import java.io.Serializable;
/**
* Interface for "mapPartition" functions. A "mapPartition" function is called a single time per
- * data partition receives an Iterable with data elements of that partition. It may return an
+ * data partition receives an Iterable with data elements of that partition. It may return an
* arbitrary number of data elements.
- * <p>
- * This function is intended to provide enhanced flexibility in the processing of elements in a partition.
+ *
+ * <p>This function is intended to provide enhanced flexibility in the processing of elements in a partition.
* For most of the simple use cases, consider using the {@link MapFunction} or {@link FlatMapFunction}.
- * <p>
- * The basic syntax for a MapPartitionFunction is as follows:
+ *
+ * <p>The basic syntax for a MapPartitionFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
- *
+ *
* DataSet<Y> result = input.mapPartition(new MyMapPartitionFunction());
* }</pre>
- *
+ *
* @param <T> Type of the input elements.
* @param <O> Type of the returned elements.
*/
@Public
public interface MapPartitionFunction<T, O> extends Function, Serializable {
-
+
/**
* A user-implemented function that modifies or transforms an incoming object.
*
* @param values All records for the mapper
* @param out The collector to hand results to.
- * @throws Exception
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
*/
void mapPartition(Iterable<T> values, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
index 3579680..76f2e49 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -27,21 +27,21 @@ import java.io.Serializable;
* a single value, by taking always two elements and combining them into one. Reduce functions
* may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced
* individually.
- * <p>
- * For a reduce functions that work on an entire group at the same time (such as the
+ *
+ * <p>For a reduce functions that work on an entire group at the same time (such as the
* MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the general case,
* ReduceFunctions are considered faster, because they allow the system to use more efficient
* execution strategies.
- * <p>
- * The basic syntax for using a grouped ReduceFunction is as follows:
+ *
+ * <p>The basic syntax for using a grouped ReduceFunction is as follows:
* <pre>{@code
* DataSet<X> input = ...;
- *
+ *
* DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());
* }</pre>
- * <p>
- * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
- *
+ *
+ * <p>Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
* @param <T> Type of the elements that this function processes.
*/
@Public
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
index dbaf639..285c1f4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
* {@link RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @see AggregateFunction
*
* @param <IN> The type of the values that are aggregated (input values)
@@ -33,7 +33,7 @@ import org.apache.flink.annotation.PublicEvolving;
* @param <OUT> The type of the aggregated result
*/
@PublicEvolving
-public abstract class RichAggregateFunction<IN, ACC, OUT>
+public abstract class RichAggregateFunction<IN, ACC, OUT>
extends AbstractRichFunction
implements AggregateFunction<IN, ACC, OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
index 517e512..6e23fcd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
* @param <OUT> The type of the result elements.
@@ -35,7 +35,7 @@ import org.apache.flink.util.Collector;
public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
-
+
@Override
public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
index dbeefd5..b4a8454 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
@@ -25,14 +25,14 @@ import org.apache.flink.annotation.Public;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
* @param <OUT> The type of the result elements.
*/
@Public
public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
-
+
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
index 4458038..c09e9e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
@@ -25,14 +25,14 @@ import org.apache.flink.annotation.Public;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <T> The type of the filtered elements.
*/
@Public
public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
-
+
private static final long serialVersionUID = 1L;
-
+
@Override
public abstract boolean filter(T value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
index 0e06d81..7c84ee8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
* @param <OUT> The type of the result elements.
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
index 94443b8..1fdc4b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index c0f0de2..9c51b2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -28,39 +28,39 @@ import org.apache.flink.configuration.Configuration;
*/
@Public
public interface RichFunction extends Function {
-
+
/**
- * Initialization method for the function. It is called before the actual working methods
+ * Initialization method for the function. It is called before the actual working methods
* (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that
* are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
- * <p>
- * The configuration object passed to the function can be used for configuration and initialization.
+ *
+ * <p>The configuration object passed to the function can be used for configuration and initialization.
* The configuration contains all parameters that were configured on the function in the program
* composition.
- *
+ *
* <pre>{@code
* public class MyMapper extends FilterFunction<String> {
- *
+ *
* private String searchString;
- *
+ *
* public void open(Configuration parameters) {
* this.searchString = parameters.getString("foo");
* }
- *
+ *
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
- * <p>
- * By default, this method does nothing.
- *
- * @param parameters The configuration containing the parameters attached to the contract.
- *
+ *
+ * <p>By default, this method does nothing.
+ *
+ * @param parameters The configuration containing the parameters attached to the contract.
+ *
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
- *
+ *
* @see org.apache.flink.configuration.Configuration
*/
void open(Configuration parameters) throws Exception;
@@ -69,28 +69,28 @@ public interface RichFunction extends Function {
* Tear-down method for the user code. It is called after the last call to the main working methods
* (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration, this method will
* be invoked after each iteration superstep.
- * <p>
- * This method can be used for clean up work.
- *
+ *
+ * <p>This method can be used for clean up work.
+ *
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
void close() throws Exception;
-
+
// ------------------------------------------------------------------------
// Runtime context
// ------------------------------------------------------------------------
-
+
/**
- * Gets the context that contains information about the UDF's runtime, such as the
+ * Gets the context that contains information about the UDF's runtime, such as the
* parallelism of the function, the subtask index of the function, or the name of
* the of the task that executes the function.
- *
+ *
* <p>The RuntimeContext also gives access to the
* {@link org.apache.flink.api.common.accumulators.Accumulator}s and the
* {@link org.apache.flink.api.common.cache.DistributedCache}.
- *
+ *
* @return The UDF's runtime context.
*/
RuntimeContext getRuntimeContext();
@@ -99,15 +99,15 @@ public interface RichFunction extends Function {
* Gets a specialized version of the {@link RuntimeContext}, which has additional information
* about the iteration in which the function is executed. This IterationRuntimeContext is only
* available if the function is part of an iteration. Otherwise, this method throws an exception.
- *
+ *
* @return The IterationRuntimeContext.
* @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an iteration.
*/
IterationRuntimeContext getIterationRuntimeContext();
-
+
/**
* Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
- *
+ *
* @param t The runtime context.
*/
void setRuntimeContext(RuntimeContext t);
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
index de34031..673a8e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.common.functions;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.util.Collector;
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index 2c74902..5646ead 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -27,20 +27,20 @@ import org.apache.flink.util.Collector;
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
*
- * Partial computation can significantly improve the performance of a {@link RichGroupReduceFunction}.
+ * <p>Partial computation can significantly improve the performance of a {@link RichGroupReduceFunction}.
* This technique is also known as applying a Combiner.
- * Implement the {@link GroupCombineFunction<IN, IN>} interface to enable partial computation, i.e.,
+ * Implement the {@link GroupCombineFunction} interface to enable partial computation, i.e.,
* a combiner for this {@link RichGroupReduceFunction}.
- *
+ *
* @param <IN> Type of the elements that this function processes.
* @param <OUT> The type of the elements returned by the user-defined function.
*/
@Public
public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT> {
-
+
private static final long serialVersionUID = 1L;
@Override
public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
index ceeca79..802d345 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.annotation.Public;
* @param <OUT> The type of the result elements.
*/
@Public
-public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
+public abstract class RichJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction implements JoinFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
index 81fec5d..d4683c9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.Public;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
*/
@@ -33,7 +33,7 @@ import org.apache.flink.annotation.Public;
public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
private static final long serialVersionUID = 1L;
-
+
@Override
public abstract OUT map(IN value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
index b3b800a..d2781af 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <I> Type of the input elements.
* @param <O> Type of the returned elements.
*/
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunction implements MapPartitionFunction<I, O> {
private static final long serialVersionUID = 1L;
-
+
@Override
public abstract void mapPartition(Iterable<I> values, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
index 7b42ec7..5dc16e6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
@@ -25,12 +25,12 @@ import org.apache.flink.annotation.Public;
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
- *
+ *
* @param <T> Type of the elements that this function processes.
*/
@Public
public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {
-
+
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index e221091..1a4a9a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -47,10 +47,10 @@ import java.util.Map;
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
- * of the function will have a context through which it can access static contextual information (such as
+ * of the function will have a context through which it can access static contextual information (such as
* the current parallelism) and other constructs like accumulators and broadcast variables.
- * <p>
- * A function can, during runtime, obtain the RuntimeContext via a call to
+ *
+ * <p>A function can, during runtime, obtain the RuntimeContext via a call to
* {@link AbstractRichFunction#getRuntimeContext()}.
*/
@Public
@@ -58,14 +58,14 @@ public interface RuntimeContext {
/**
* Returns the name of the task in which the UDF runs, as assigned during plan construction.
- *
+ *
* @return The name of the task in which the UDF runs.
*/
String getTaskName();
/**
* Returns the metric group for this parallel subtask.
- *
+ *
* @return The metric group for this parallel subtask.
*/
@PublicEvolving
@@ -73,7 +73,7 @@ public interface RuntimeContext {
/**
* Gets the parallelism with which the parallel task runs.
- *
+ *
* @return The parallelism with which the parallel task runs.
*/
int getNumberOfParallelSubtasks();
@@ -89,7 +89,7 @@ public interface RuntimeContext {
/**
* Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
* parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
- *
+ *
* @return The index of the parallel subtask.
*/
int getIndexOfThisSubtask();
@@ -115,11 +115,11 @@ public interface RuntimeContext {
* job.
*/
ExecutionConfig getExecutionConfig();
-
+
/**
* Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the
* jar file of a user job.
- *
+ *
* @return The ClassLoader for user code classes.
*/
ClassLoader getUserCodeClassLoader();
@@ -137,7 +137,7 @@ public interface RuntimeContext {
/**
* Get an existing accumulator object. The accumulator must have been added
* previously in this local runtime context.
- *
+ *
* Throws an exception if the accumulator does not exist or if the
* accumulator exists, but with different type.
*/
@@ -175,7 +175,7 @@ public interface RuntimeContext {
*/
@PublicEvolving
Histogram getHistogram(String name);
-
+
// --------------------------------------------------------------------------------------------
/**
@@ -189,27 +189,27 @@ public interface RuntimeContext {
boolean hasBroadcastVariable(String name);
/**
- * Returns the result bound to the broadcast variable identified by the
+ * Returns the result bound to the broadcast variable identified by the
* given {@code name}.
* <p>
* IMPORTANT: The broadcast variable data structure is shared between the parallel
* tasks on one machine. Any access that modifies its internal state needs to
* be manually synchronized by the caller.
- *
+ *
* @param name The name under which the broadcast variable is registered;
* @return The broadcast variable, materialized as a list of elements.
*/
<RT> List<RT> getBroadcastVariable(String name);
-
+
/**
- * Returns the result bound to the broadcast variable identified by the
+ * Returns the result bound to the broadcast variable identified by the
* given {@code name}. The broadcast variable is returned as a shared data structure
* that is initialized with the given {@link BroadcastVariableInitializer}.
* <p>
* IMPORTANT: The broadcast variable data structure is shared between the parallel
* tasks on one machine. Any access that modifies its internal state needs to
* be manually synchronized by the caller.
- *
+ *
* @param name The name under which the broadcast variable is registered;
* @param initializer The initializer that creates the shared data structure of the broadcast
* variable from the sequence of elements.
@@ -220,11 +220,11 @@ public interface RuntimeContext {
/**
* Returns the {@link DistributedCache} to get the local temporary file copies of files otherwise not
* locally accessible.
- *
+ *
* @return The distributed cache of the worker executing this instance.
*/
DistributedCache getDistributedCache();
-
+
// ------------------------------------------------------------------------
// Methods for accessing state
// ------------------------------------------------------------------------
@@ -280,7 +280,7 @@ public interface RuntimeContext {
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* holds lists. One can add elements to the list, or retrieve the list as a whole.
- *
+ *
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
@@ -345,7 +345,7 @@ public interface RuntimeContext {
* return new Tuple2<>(value, state.get());
* }
* });
- *
+ *
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
@@ -446,7 +446,7 @@ public interface RuntimeContext {
@PublicEvolving
@Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
-
+
/**
* Gets a handle to the system's key/value map state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
index e6da8a8..07ef372 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
@@ -3,13 +3,14 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.PublicEvolving;
@@ -25,12 +26,12 @@ public interface StoppableFunction {
* Stops the source. In contrast to {@code cancel()} this is a request to the source function to shut down
* gracefully. Pending data can still be emitted and it is not required to stop immediately -- however, in the near
* future. The job will keep running until all emitted data is processed completely.
- * <p>
- * Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source
+ *
+ * <p>Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source
* will break out of this loop. This can be achieved by having a volatile field "isRunning" that is checked in the
* loop and that is set to false in this method.
- * <p>
- * <strong>The call to {@code stop()} should not block and not throw any exception.</strong>
+ *
+ * <p><strong>The call to {@code stop()} should not block and not throw any exception.</strong>
*/
void stop();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index d6262c7..f63c45e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -68,13 +68,13 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
private final Map<String, Accumulator<?, ?>> accumulators;
private final DistributedCache distributedCache;
-
+
private final MetricGroup metrics;
public AbstractRuntimeUDFContext(TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
- Map<String, Accumulator<?,?>> accumulators,
+ Map<String, Accumulator<?, ?>> accumulators,
Map<String, Future<Path>> cpTasks,
MetricGroup metrics) {
this.taskInfo = checkNotNull(taskInfo);
@@ -164,23 +164,23 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
public Map<String, Accumulator<?, ?>> getAllAccumulators() {
return Collections.unmodifiableMap(this.accumulators);
}
-
+
@Override
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
}
-
+
@Override
public DistributedCache getDistributedCache() {
return this.distributedCache;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@SuppressWarnings("unchecked")
private <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name,
- Class<? extends Accumulator<V, A>> accumulatorClass)
- {
+ Class<? extends Accumulator<V, A>> accumulatorClass) {
+
Accumulator<?, ?> accumulator = accumulators.get(name);
if (accumulator != null) {
@@ -233,7 +233,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
-
+
@Override
@PublicEvolving
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
index c5adb2e..0de1c4d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
@@ -18,20 +18,25 @@
package org.apache.flink.api.common.functions.util;
-import java.util.Iterator;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.TraversableOnceException;
+import java.util.Iterator;
+
+/**
+ * Wraps an iterator to return deep copies of the original iterator's elements.
+ *
+ * @param <E> The type of the element returned by the iterator.
+ */
@Internal
public class CopyingIterator<E> implements Iterator<E>, Iterable<E> {
private final Iterator<E> source;
private final TypeSerializer<E> serializer;
-
+
private boolean available = true;
-
+
public CopyingIterator(Iterator<E> source, TypeSerializer<E> serializer) {
this.source = source;
this.serializer = serializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
index 16b7bf3..bfbd7ad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
@@ -18,12 +18,17 @@
package org.apache.flink.api.common.functions.util;
-import java.util.List;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.Collector;
+import java.util.List;
+
+/**
+ * A {@link Collector} that collects deep copies of its elements in a list.
+ *
+ * @param <T> The type of the collected elements.
+ */
@Internal
public class CopyingListCollector<T> implements Collector<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
index c820640..8ffa886 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
@@ -18,14 +18,14 @@
package org.apache.flink.api.common.functions.util;
-import java.util.List;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Collector;
+import java.util.List;
+
/**
* A {@link Collector} that puts the collected elements into a given list.
- *
+ *
* @param <T> The type of the collected elements.
*/
@Internal
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
index 6351365..a96f566 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
@@ -21,6 +21,9 @@ package org.apache.flink.api.common.functions.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.AbstractRichFunction;
+/**
+ * An {@link AbstractRichFunction} that does nothing.
+ */
@Internal
public class NoOpFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index ba3f85e..bab2eb0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -18,11 +18,6 @@
package org.apache.flink.api.common.functions.util;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
@@ -32,15 +27,20 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
/**
* A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
*/
@Internal
public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
- private final HashMap<String, Object> initializedBroadcastVars = new HashMap<String, Object>();
-
- private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
+ private final HashMap<String, Object> initializedBroadcastVars = new HashMap<>();
+
+ private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<>();
public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
@@ -56,7 +56,7 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
@Override
@SuppressWarnings("unchecked")
public <RT> List<RT> getBroadcastVariable(String name) {
-
+
// check if we have an initialized version
Object o = this.initializedBroadcastVars.get(name);
if (o != null) {
@@ -64,7 +64,7 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
return (List<RT>) o;
}
else {
- throw new IllegalStateException("The broadcast variable with name '" + name +
+ throw new IllegalStateException("The broadcast variable with name '" + name +
"' is not a List. A different call must have requested this variable with a BroadcastVariableInitializer.");
}
}
@@ -79,11 +79,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
}
}
}
-
+
@SuppressWarnings("unchecked")
@Override
public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-
+
// check if we have an initialized version
Object o = this.initializedBroadcastVars.get(name);
if (o != null) {
@@ -101,19 +101,19 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
}
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public void setBroadcastVariable(String name, List<?> value) {
this.uninitializedBroadcastVars.put(name, value);
this.initializedBroadcastVars.remove(name);
}
-
+
public void clearBroadcastVariable(String name) {
this.uninitializedBroadcastVars.remove(name);
this.initializedBroadcastVars.remove(name);
}
-
+
public void clearAllBroadcastVariables() {
this.uninitializedBroadcastVars.clear();
this.initializedBroadcastVars.clear();