You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/18 08:08:24 UTC

[4/5] flink git commit: [hotfix] [core] Fix checkstyle in org.apache.flink.api.common.functions

[hotfix] [core] Fix checkstyle in org.apache.flink.api.common.functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9644df74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9644df74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9644df74

Branch: refs/heads/master
Commit: 9644df746e386e4be21d9786d3a98427c0b79e73
Parents: f41debe
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 16:59:55 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 18 10:05:39 2018 +0200

----------------------------------------------------------------------
 .../common/functions/AbstractRichFunction.java  |  16 +--
 .../api/common/functions/AggregateFunction.java |  42 ++++----
 .../functions/BroadcastVariableInitializer.java |  26 ++---
 .../api/common/functions/CoGroupFunction.java   |  30 +++---
 .../api/common/functions/CombineFunction.java   |  14 +--
 .../api/common/functions/CrossFunction.java     |  20 ++--
 .../api/common/functions/FilterFunction.java    |  22 ++--
 .../api/common/functions/FlatJoinFunction.java  |  34 +++---
 .../api/common/functions/FlatMapFunction.java   |   8 +-
 .../api/common/functions/FoldFunction.java      |   9 +-
 .../flink/api/common/functions/Function.java    |   2 +-
 .../common/functions/GroupCombineFunction.java  |  14 +--
 .../common/functions/GroupReduceFunction.java   |  24 ++---
 .../common/functions/InvalidTypesException.java |   6 +-
 .../functions/IterationRuntimeContext.java      |  12 ++-
 .../api/common/functions/JoinFunction.java      |  28 ++---
 .../flink/api/common/functions/MapFunction.java |   8 +-
 .../common/functions/MapPartitionFunction.java  |  19 ++--
 .../api/common/functions/ReduceFunction.java    |  16 +--
 .../common/functions/RichAggregateFunction.java |   4 +-
 .../common/functions/RichCoGroupFunction.java   |   4 +-
 .../api/common/functions/RichCrossFunction.java |   4 +-
 .../common/functions/RichFilterFunction.java    |   6 +-
 .../common/functions/RichFlatJoinFunction.java  |   2 +-
 .../common/functions/RichFlatMapFunction.java   |   2 +-
 .../api/common/functions/RichFunction.java      |  50 ++++-----
 .../functions/RichGroupCombineFunction.java     |   1 -
 .../functions/RichGroupReduceFunction.java      |  10 +-
 .../api/common/functions/RichJoinFunction.java  |   2 +-
 .../api/common/functions/RichMapFunction.java   |   4 +-
 .../functions/RichMapPartitionFunction.java     |   4 +-
 .../common/functions/RichReduceFunction.java    |   4 +-
 .../api/common/functions/RuntimeContext.java    |  42 ++++----
 .../api/common/functions/StoppableFunction.java |  13 +--
 .../util/AbstractRuntimeUDFContext.java         |  18 ++--
 .../common/functions/util/CopyingIterator.java  |  13 ++-
 .../functions/util/CopyingListCollector.java    |   9 +-
 .../common/functions/util/ListCollector.java    |   6 +-
 .../api/common/functions/util/NoOpFunction.java |   3 +
 .../functions/util/RuntimeUDFContext.java       |  32 +++---
 .../functions/util/RuntimeUDFContextTest.java   | 104 ++++++++++---------
 tools/maven/suppressions-core.xml               |   4 -
 42 files changed, 355 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index ff2cbea..f13e584 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -18,11 +18,11 @@ c * Licensed to the Apache Software Foundation (ASF) under one
 
 package org.apache.flink.api.common.functions;
 
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.Serializable;
+
 /**
  * An abstract stub implementation for rich user-defined functions.
  * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
@@ -31,13 +31,13 @@ import org.apache.flink.configuration.Configuration;
  */
 @Public
 public abstract class AbstractRichFunction implements RichFunction, Serializable {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Runtime context access
 	// --------------------------------------------------------------------------------------------
-	
+
 	private transient RuntimeContext runtimeContext;
 
 	@Override
@@ -53,7 +53,7 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
 			throw new IllegalStateException("The runtime context has not been initialized.");
 		}
 	}
-	
+
 	@Override
 	public IterationRuntimeContext getIterationRuntimeContext() {
 		if (this.runtimeContext == null) {
@@ -64,11 +64,11 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
 			throw new IllegalStateException("This stub is not part of an iteration step function.");
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Default life cycle methods
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void open(Configuration parameters) throws Exception {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
index 78b8d94..6ce20d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -25,11 +25,11 @@ import java.io.Serializable;
 /**
  * The {@code AggregateFunction} is a flexible aggregation function, characterized by the
  * following features:
- * 
+ *
  * <ul>
  *     <li>The aggregates may use different types for input values, intermediate aggregates,
  *         and result type, to support a wide range of aggregation types.</li>
- * 
+ *
  *     <li>Support for distributive aggregations: Different intermediate aggregates can be
  *         merged together, to allow for pre-aggregation/final-aggregation optimizations.</li>
  * </ul>
@@ -39,47 +39,47 @@ import java.io.Serializable;
  * obtained by finalizing the accumulator state. This supports aggregation functions where the
  * intermediate state needs to be different than the aggregated values and the final result type,
  * such as for example <i>average</i> (which typically keeps a count and sum).
- * Merging intermediate aggregates (partial aggregates) means merging the accumulators.  
- * 
+ * Merging intermediate aggregates (partial aggregates) means merging the accumulators.
+ *
  * <p>The AggregationFunction itself is stateless. To allow a single AggregationFunction
  * instance to maintain multiple aggregates (such as one aggregate per key), the
  * AggregationFunction creates a new accumulator whenever a new aggregation is started.
- * 
+ *
  * <p>Aggregation functions must be {@link Serializable} because they are sent around
  * between distributed processes during distributed execution.
- * 
+ *
  * <h1>Example: Average and Weighted Average</h1>
- * 
+ *
  * <pre>{@code
  * // the accumulator, which holds the state of the in-flight aggregate
  * public class AverageAccumulator {
  *     long count;
  *     long sum;
  * }
- * 
+ *
  * // implementation of an aggregation function for an 'average'
  * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
- * 
+ *
  *     public AverageAccumulator createAccumulator() {
  *         return new AverageAccumulator();
  *     }
- * 
+ *
  *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
  *         a.count += b.count;
  *         a.sum += b.sum;
  *         return a;
  *     }
- * 
+ *
  *     public void add(Integer value, AverageAccumulator acc) {
  *         acc.sum += value;
  *         acc.count++;
  *     }
- * 
+ *
  *     public Double getResult(AverageAccumulator acc) {
  *         return acc.sum / (double) acc.count;
  *     }
  * }
- * 
+ *
  * // implementation of a weighted average
  * // this reuses the same accumulator type as the aggregate function for 'average'
  * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
@@ -104,7 +104,7 @@ import java.io.Serializable;
  *     }
  * }
  * }</pre>
- * 
+ *
  * @param <IN>  The type of the values that are aggregated (input values)
  * @param <ACC> The type of the accumulator (intermediate aggregate state).
  * @param <OUT> The type of the aggregated result
@@ -114,10 +114,10 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
 
 	/**
 	 * Creates a new accumulator, starting a new aggregate.
-	 * 
+	 *
 	 * <p>The new accumulator is typically meaningless unless a value is added
 	 * via {@link #add(Object, Object)}.
-	 * 
+	 *
 	 * <p>The accumulator is the state of a running aggregation. When a program has multiple
 	 * aggregates in progress (such as per key and window), the state (per key and window)
 	 * is the size of the accumulator.
@@ -131,7 +131,7 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
 	 * new accumulator value.
 	 *
 	 * <p>For efficiency, the input accumulator may be modified and returned.
-	 * 
+	 *
 	 * @param value The value to add
 	 * @param accumulator The accumulator to add the value to
 	 */
@@ -139,7 +139,7 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
 
 	/**
 	 * Gets the result of the aggregation from the accumulator.
-	 * 
+	 *
 	 * @param accumulator The accumulator of the aggregation
 	 * @return The final aggregation result.
 	 */
@@ -147,14 +147,14 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
 
 	/**
 	 * Merges two accumulators, returning an accumulator with the merged state.
-	 * 
+	 *
 	 * <p>This function may reuse any of the given accumulators as the target for the merge
 	 * and return that. The assumption is that the given accumulators will not be used any
 	 * more after having been passed to this function.
-	 * 
+	 *
 	 * @param a An accumulator to merge
 	 * @param b Another accumulator to merge
-	 * 
+	 *
 	 * @return The accumulator with the merged state
 	 */
 	ACC merge(ACC a, ACC b);

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
index 65c3c1f..57cdf10 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.Public;
  * into another format during initialization. The transformed variable is shared
  * among the parallel instances of a function inside one TaskManager, the
  * same way as the plain broadcast variables (lists) are shared.
- * 
+ *
  * <p>The broadcast variable initializer will in many cases only be executed by one
  * parallel function instance per TaskManager, when acquiring the broadcast variable
  * for the first time inside that particular TaskManager. It is possible that a
@@ -33,10 +33,10 @@ import org.apache.flink.annotation.Public;
  * the variables are not overlapping in their execution time; in such cases, it can
  * happen that one function instance released the broadcast variable, and another
  * function instance materializes it again.
- * 
+ *
  * <p>This is an example of how to use a broadcast variable initializer, transforming
  * the shared list of values into a shared map.
- * 
+ *
  * <pre>{@code
  * public class MyFunction extends RichMapFunction<Long, String> {
  *
@@ -45,29 +45,29 @@ import org.apache.flink.annotation.Public;
  *     public void open(Configuration cfg) throws Exception {
  *         getRuntimeContext().getBroadcastVariableWithInitializer("mapvar",
  *             new BroadcastVariableInitializer<Tuple2<Long, String>, Map<Long, String>>() {
- *             
+ *
  *                 public Map<Long, String> initializeBroadcastVariable(Iterable<Tuple2<Long, String>> data) {
  *                     Map<Long, String> map = new HashMap<>();
- *                     
+ *
  *                     for (Tuple2<Long, String> t : data) {
  *                         map.put(t.f0, t.f1);
  *                     }
- *                     
+ *
  *                     return map;
  *                 }
  *             });
  *     }
- *     
+ *
  *     public String map(Long value) {
  *         // replace the long by the String, based on the map
  *         return map.get(value);
  *     }
  * }
- * 
+ *
  * }</pre>
- * 
+ *
  * @param <T> The type of the elements in the list of the original untransformed broadcast variable.
- * @param <O> The type of the transformed broadcast variable. 
+ * @param <O> The type of the transformed broadcast variable.
  */
 @Public
 public interface BroadcastVariableInitializer<T, O> {
@@ -75,10 +75,10 @@ public interface BroadcastVariableInitializer<T, O> {
 	/**
 	 * The method that reads the data elements from the broadcast variable and
 	 * creates the transformed data structure.
-	 * 
-	 * <p>The iterable with the data elements can be traversed only once, i.e., 
+	 *
+	 * <p>The iterable with the data elements can be traversed only once, i.e.,
 	 * only the first call to {@code iterator()} will succeed.
-	 * 
+	 *
 	 * @param data The sequence of elements in the broadcast variable.
 	 * @return The transformed broadcast variable.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index cf2fdc5..ec083aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -18,47 +18,47 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
  * The interface for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set
- * after a key and then "joining" the groups by calling this function with the two sets for each key. 
+ * after a key and then "joining" the groups by calling this function with the two sets for each key.
  * If a key is present in only one of the two inputs, it may be that one of the groups is empty.
- * <p>
- * The basic syntax for using CoGroup on two data sets is as follows:
+ *
+ * <p>The basic syntax for using CoGroup on two data sets is as follows:
  * <pre>{@code
  * DataSet<X> set1 = ...;
  * DataSet<Y> set2 = ...;
- * 
+ *
  * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
  * }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * <p>Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
  * with in empty input for the side of the data set that did not contain elements with that specific key.
- * 
+ *
  * @param <IN1> The data type of the first input data set.
  * @param <IN2> The data type of the second input data set.
  * @param <O> The data type of the returned elements.
  */
 @Public
 public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
-	
+
 	/**
 	 * This method must be implemented to provide a user implementation of a
 	 * coGroup. It is called for each pair of element groups where the elements share the
 	 * same key.
-	 * 
+	 *
 	 * @param first The records from the first input.
 	 * @param second The records from the second.
 	 * @param out A collector to return elements.
-	 * 
+	 *
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */
-	public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
+	void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index af05c0d..4ff90d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -26,13 +26,13 @@ import java.io.Serializable;
  * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
  * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
  * only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ *
+ * <p>Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
  * reduce the data volume earlier, before the entire groups have been collected.
- * <p>
- * This special variant of the combine function reduces the group of elements into a single element. A variant
+ *
+ * <p>This special variant of the combine function reduces the group of elements into a single element. A variant
  * that can return multiple values per group is defined in {@link GroupCombineFunction}.
- * 
+ *
  * @param <IN> The data type processed by the combine function.
  * @param <OUT> The data type emitted by the combine function.
  */
@@ -41,10 +41,10 @@ public interface CombineFunction<IN, OUT> extends Function, Serializable {
 
 	/**
 	 * The combine method, called (potentially multiple timed) with subgroups of elements.
-	 * 
+	 *
 	 * @param values The elements to be combined.
 	 * @return The single resulting value.
-	 * 
+	 *
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
index e29242f..f4dbe82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -25,20 +25,20 @@ import java.io.Serializable;
 /**
  * Interface for Cross functions. Cross functions are applied to the Cartesian product
  * of their inputs and are called for each pair of elements.
- * 
- * They are optional, a means of convenience that can be used to directly manipulate the
+ *
+ * <p>They are optional, a means of convenience that can be used to directly manipulate the
  * pair of elements instead of producing 2-tuples containing the pairs.
- * <p>
- * The basic syntax for using Cross on two data sets is as follows:
+ *
+ * <p>The basic syntax for using Cross on two data sets is as follows:
  * <pre>{@code
  * DataSet<X> set1 = ...;
  * DataSet<Y> set2 = ...;
- * 
+ *
  * set1.cross(set2).with(new MyCrossFunction());
  * }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * 
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
  * @param <OUT> The type of the result elements.
@@ -48,11 +48,11 @@ public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
 
 	/**
 	 * Cross UDF method. Called once per pair of elements in the Cartesian product of the inputs.
-	 * 
+	 *
 	 * @param val1 Element from first input.
 	 * @param val2 Element from the second input.
 	 * @return The result element.
-	 * 
+	 *
 	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
 	 *                   and may trigger the recovery logic.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
index e8e83b6..a97715a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -25,33 +25,33 @@ import java.io.Serializable;
 /**
  * A filter function is a predicate applied individually to each record.
  * The predicate decides whether to keep the element, or to discard it.
- * <p>
- * The basic syntax for using a FilterFunction is as follows:
+ *
+ * <p>The basic syntax for using a FilterFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
- * 
+ *
  * DataSet<X> result = input.filter(new MyFilterFunction());
  * }</pre>
- * <p>
- * <strong>IMPORTANT:</strong> The system assumes that the function does not
+ *
+ * <p><strong>IMPORTANT:</strong> The system assumes that the function does not
  * modify the elements on which the predicate is applied. Violating this assumption
  * can lead to incorrect results.
- * 
+ *
  * @param <T> The type of the filtered elements.
  */
 @Public
 public interface FilterFunction<T> extends Function, Serializable {
-	
+
 	/**
 	 * The filter function that evaluates the predicate.
-	 * <p>
-	 * <strong>IMPORTANT:</strong> The system assumes that the function does not
+	 *
+	 * <p><strong>IMPORTANT:</strong> The system assumes that the function does not
 	 * modify the elements on which the predicate is applied. Violating this assumption
 	 * can lead to incorrect results.
-	 * 
+	 *
 	 * @param value The value to be filtered.
 	 * @return True for values that should be retained, false for values to be filtered out.
-	 * 
+	 *
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
index 5d7c0ee..f6ae8b3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -26,30 +26,30 @@ import java.io.Serializable;
 /**
  * Interface for Join functions. Joins combine two data sets by joining their
  * elements on specified keys. This function is called with each pair of joining elements.
- * <p>
- * This particular variant of the join function supports to return zero, one, or more result values
- * per pair of joining values. 
- * <p>
- * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ *
+ * <p>This particular variant of the join function supports to return zero, one, or more result values
+ * per pair of joining values.
+ *
+ * <p>By default, the joins follows strictly the semantics of an "inner join" in SQL.
  * the semantics are those of an "inner join", meaning that elements are filtered out
  * if their key is not contained in the other data set.
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
+ *
+ * <p>The basic syntax for using Join on two data sets is as follows:
  * <pre>{@code
  * DataSet<X> set1 = ...;
  * DataSet<Y> set2 = ...;
- * 
+ *
  * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
  * }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * <p>The Join function is an optional part of a join operation. If no JoinFunction is provided,
  * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
  * the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link CoGroupFunction} to perform an outer join.
- * 
+ *
+ * <p>Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ *
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
  * @param <OUT> The type of the result elements.
@@ -59,11 +59,11 @@ public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable
 
 	/**
 	 * The join method, called once per joined pair of elements.
-	 * 
+	 *
 	 * @param first The element from first input.
 	 * @param second The element from second input.
 	 * @param out The collector used to return zero, one, or more elements.
-	 * 
+	 *
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index 8b4a8f2..a5211e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -28,14 +28,14 @@ import java.io.Serializable;
  * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
  * and arrays. Operations that produce multiple strictly one result element per input element can also
  * use the {@link MapFunction}.
- * <p>
- * The basic syntax for using a FlatMapFunction is as follows:
+ *
+ * <p>The basic syntax for using a FlatMapFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
- * 
+ *
  * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
  * }</pre>
- * 
+ *
  * @param <T> Type of the input elements.
  * @param <O> Type of the returned elements.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index b3fd700..5e41fdb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -25,16 +25,16 @@ import java.io.Serializable;
 /**
  * Base interface for Fold functions. Fold functions combine groups of elements to
  * a single value, by applying a binary operation to an initial accumulator element every element from a group elements.
- * <p>
- * The basic syntax for using a FoldFunction is as follows:
+ *
+ * <p>The basic syntax for using a FoldFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
  *
  * X initialValue = ...;
  * DataSet<X> result = input.fold(new MyFoldFunction(), initialValue);
  * }</pre>
- * <p>
- * Like all functions, the FoldFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
+ * <p>Like all functions, the FoldFunction needs to be serializable, as defined in {@link java.io.Serializable}.
  *
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
@@ -44,6 +44,7 @@ import java.io.Serializable;
 @Public
 @Deprecated
 public interface FoldFunction<O, T> extends Function, Serializable {
+
 	/**
 	 * The core method of FoldFunction, combining two values into one value of the same type.
 	 * The fold function is consecutively applied to all values of a group until only a single value remains.

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index 86c42f8..17ceba0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Public;
 
 /**
  * The base interface for all user-defined functions.
- * 
+ *
  * <p>This interface is empty in order to allow extending interfaces to
  * be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p>
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
index 53e7ade..0294506 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
  * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
  * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
  * only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ *
+ * <p>Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
  * reduce the data volume earlier, before the entire groups have been collected.
- * <p>
- * This special variant of the combine function supports to return more than one element per group.
+ *
+ * <p>This special variant of the combine function supports to return more than one element per group.
  * It is frequently less efficient to use than the {@link CombineFunction}.
- * 
+ *
  * @param <IN> The data type processed by the combine function.
  * @param <OUT> The data type emitted by the combine function.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 42b8e0c..7d0dc46 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -18,41 +18,41 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
  * The interface for group reduce functions. GroupReduceFunctions process groups of elements.
  * They may aggregate them to a single value, or produce multiple result values for each group.
  * The group may be defined by sharing a common grouping key, or the group may simply be
  * all elements of a data set.
- * <p>
- * For a reduce functions that works incrementally by combining always two elements, see 
+ *
+ * <p>For a reduce functions that works incrementally by combining always two elements, see
  * {@link ReduceFunction}.
- * <p>
- * The basic syntax for using a grouped GroupReduceFunction is as follows:
+ *
+ * <p>The basic syntax for using a grouped GroupReduceFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
- * 
+ *
  * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction());
  * }</pre>
  *
- * Partial computation can significantly improve the performance of a {@link GroupReduceFunction}.
+ * <p>Partial computation can significantly improve the performance of a {@link GroupReduceFunction}.
  * This technique is also known as applying a Combiner.
- * Implement the {@link GroupCombineFunction<T, T>} interface to enable partial computations, i.e.,
+ * Implement the {@link GroupCombineFunction} interface to enable partial computations, i.e.,
  * a combiner for this {@link GroupReduceFunction}.
- * 
+ *
  * @param <T> Type of the elements that this function processes.
  * @param <O> The type of the elements returned by the user-defined function.
  */
 @Public
 public interface GroupReduceFunction<T, O> extends Function, Serializable {
-	
+
 	/**
 	 * The reduce method. The function receives one call per group of elements.
-	 * 
+	 *
 	 * @param values All records that belong to the given input key.
 	 * @param out The collector to hand results to.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
index 143180f..ce1fb48 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 
 /**
  * A special case of the {@link InvalidProgramException}, indicating that the types used in
- * an operation are invalid or inconsistent. 
+ * an operation are invalid or inconsistent.
  */
 @Public
 public class InvalidTypesException extends InvalidProgramException {
@@ -39,13 +39,13 @@ public class InvalidTypesException extends InvalidProgramException {
 
 	/**
 	 * Creates a new exception with the given message.
-	 * 
+	 *
 	 * @param message The exception message.
 	 */
 	public InvalidTypesException(String message) {
 		super(message);
 	}
-	
+
 	public InvalidTypesException(String message, Throwable e) {
 		super(message, e);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
index 0019c32..a6cadd2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
@@ -18,23 +18,27 @@
 
 package org.apache.flink.api.common.functions;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.types.Value;
 
+/**
+ * A specialization of the {@link RuntimeContext} available in iterative computations of the
+ * DataSet API.
+ */
 @Public
 public interface IterationRuntimeContext extends RuntimeContext {
-	
+
 	/**
 	 * Gets the number of the current superstep. Superstep numbers start at <i>1</i>.
-	 * 
+	 *
 	 * @return The number of the current superstep.
 	 */
 	int getSuperstepNumber();
 
 	@PublicEvolving
 	<T extends Aggregator<?>> T getIterationAggregator(String name);
-	
+
 	<T extends Value> T getPreviousIterationAggregate(String name);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
index 5897b47..25f2cc6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -25,27 +25,27 @@ import java.io.Serializable;
 /**
  * Interface for Join functions. Joins combine two data sets by joining their
  * elements on specified keys. This function is called with each pair of joining elements.
- * <p>
- * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ *
+ * <p>By default, the joins follows strictly the semantics of an "inner join" in SQL.
  * the semantics are those of an "inner join", meaning that elements are filtered out
  * if their key is not contained in the other data set.
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
+ *
+ * <p>The basic syntax for using Join on two data sets is as follows:
  * <pre>{@code
  * DataSet<X> set1 = ...;
  * DataSet<Y> set2 = ...;
- * 
+ *
  * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
  * }</pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ *
+ * <p>{@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * <p>The Join function is an optional part of a join operation. If no JoinFunction is provided,
  * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
  * the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link CoGroupFunction} to perform an outer join.
- * 
+ *
+ * <p>Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ *
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
  * @param <OUT> The type of the result elements.
@@ -55,11 +55,11 @@ public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 
 	/**
 	 * The join method, called once per joined pair of elements.
-	 * 
+	 *
 	 * @param first The element from first input.
 	 * @param second The element from second input.
 	 * @return The resulting element.
-	 * 
+	 *
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index 9a0a45a..1017ab2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -28,14 +28,14 @@ import java.io.Serializable;
  * Typical applications are parsing elements, converting data types, or projecting out fields.
  * Operations that produce multiple result elements from a single input element can be implemented
  * using the {@link FlatMapFunction}.
- * <p>
- * The basic syntax for using a MapFunction is as follows:
+ *
+ * <p>The basic syntax for using a MapFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
- * 
+ *
  * DataSet<Y> result = input.map(new MyMapFunction());
  * }</pre>
- * 
+ *
  * @param <T> Type of the input elements.
  * @param <O> Type of the returned elements.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
index 4caaadd..c72c2d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
@@ -25,31 +25,32 @@ import java.io.Serializable;
 
 /**
  * Interface for "mapPartition" functions. A "mapPartition" function is called a single time per
- * data partition receives an Iterable with data elements of that partition. It may return an 
+ * data partition receives an Iterable with data elements of that partition. It may return an
  * arbitrary number of data elements.
- * <p>
- * This function is intended to provide enhanced flexibility in the processing of elements in a partition.
+ *
+ * <p>This function is intended to provide enhanced flexibility in the processing of elements in a partition.
  * For most of the simple use cases, consider using the {@link MapFunction} or {@link FlatMapFunction}.
- * <p>
- * The basic syntax for a MapPartitionFunction is as follows:
+ *
+ * <p>The basic syntax for a MapPartitionFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
- * 
+ *
  * DataSet<Y> result = input.mapPartition(new MyMapPartitionFunction());
  * }</pre>
- * 
+ *
  * @param <T> Type of the input elements.
  * @param <O> Type of the returned elements.
  */
 @Public
 public interface MapPartitionFunction<T, O> extends Function, Serializable {
-	
+
 	/**
 	 * A user-implemented function that modifies or transforms an incoming object.
 	 *
 	 * @param values All records for the mapper
 	 * @param out The collector to hand results to.
-	 * @throws Exception
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
 	 */
 	void mapPartition(Iterable<T> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
index 3579680..76f2e49 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -27,21 +27,21 @@ import java.io.Serializable;
  * a single value, by taking always two elements and combining them into one. Reduce functions
  * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced
  * individually.
- * <p>
- * For a reduce functions that work on an entire group at the same time (such as the 
+ *
+ * <p>For a reduce functions that work on an entire group at the same time (such as the
  * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the general case,
  * ReduceFunctions are considered faster, because they allow the system to use more efficient
  * execution strategies.
- * <p>
- * The basic syntax for using a grouped ReduceFunction is as follows:
+ *
+ * <p>The basic syntax for using a grouped ReduceFunction is as follows:
  * <pre>{@code
  * DataSet<X> input = ...;
- * 
+ *
  * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());
  * }</pre>
- * <p>
- * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
- * 
+ *
+ * <p>Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
  * @param <T> Type of the elements that this function processes.
  */
 @Public

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
index dbaf639..285c1f4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * {@link RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @see AggregateFunction
  *
  * @param <IN>  The type of the values that are aggregated (input values)
@@ -33,7 +33,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * @param <OUT> The type of the aggregated result
  */
 @PublicEvolving
-public abstract class RichAggregateFunction<IN, ACC, OUT> 
+public abstract class RichAggregateFunction<IN, ACC, OUT>
 		extends AbstractRichFunction
 		implements AggregateFunction<IN, ACC, OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
index 517e512..6e23fcd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
  * @param <OUT> The type of the result elements.
@@ -35,7 +35,7 @@ import org.apache.flink.util.Collector;
 public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
index dbeefd5..b4a8454 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
@@ -25,14 +25,14 @@ import org.apache.flink.annotation.Public;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
  * @param <OUT> The type of the result elements.
  */
 @Public
 public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
index 4458038..c09e9e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
@@ -25,14 +25,14 @@ import org.apache.flink.annotation.Public;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <T> The type of the filtered elements.
  */
 @Public
 public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	public abstract boolean filter(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
index 0e06d81..7c84ee8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
  * @param <OUT> The type of the result elements.

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
index 94443b8..1fdc4b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index c0f0de2..9c51b2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -28,39 +28,39 @@ import org.apache.flink.configuration.Configuration;
  */
 @Public
 public interface RichFunction extends Function {
-	
+
 	/**
-	 * Initialization method for the function. It is called before the actual working methods 
+	 * Initialization method for the function. It is called before the actual working methods
 	 * (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that
 	 * are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
-	 * <p>
-	 * The configuration object passed to the function can be used for configuration and initialization.
+	 *
+	 * <p>The configuration object passed to the function can be used for configuration and initialization.
 	 * The configuration contains all parameters that were configured on the function in the program
 	 * composition.
-	 * 
+	 *
 	 * <pre>{@code
 	 * public class MyMapper extends FilterFunction<String> {
-	 * 
+	 *
 	 *     private String searchString;
-	 *     
+	 *
 	 *     public void open(Configuration parameters) {
 	 *         this.searchString = parameters.getString("foo");
 	 *     }
-	 *     
+	 *
 	 *     public boolean filter(String value) {
 	 *         return value.equals(searchString);
 	 *     }
 	 * }
 	 * }</pre>
-	 * <p>
-	 * By default, this method does nothing.
-	 * 
-	 * @param parameters The configuration containing the parameters attached to the contract. 
-	 * 
+	 *
+	 * <p>By default, this method does nothing.
+	 *
+	 * @param parameters The configuration containing the parameters attached to the contract.
+	 *
 	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
 	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
 	 *                   decide whether to retry the task execution.
-	 * 
+	 *
 	 * @see org.apache.flink.configuration.Configuration
 	 */
 	void open(Configuration parameters) throws Exception;
@@ -69,28 +69,28 @@ public interface RichFunction extends Function {
 	 * Tear-down method for the user code. It is called after the last call to the main working methods
 	 * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration, this method will
 	 * be invoked after each iteration superstep.
-	 * <p>
-	 * This method can be used for clean up work.
-	 * 
+	 *
+	 * <p>This method can be used for clean up work.
+	 *
 	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
 	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
 	 *                   decide whether to retry the task execution.
 	 */
 	void close() throws Exception;
-	
+
 	// ------------------------------------------------------------------------
 	//  Runtime context
 	// ------------------------------------------------------------------------
-	
+
 	/**
-	 * Gets the context that contains information about the UDF's runtime, such as the 
+	 * Gets the context that contains information about the UDF's runtime, such as the
 	 * parallelism of the function, the subtask index of the function, or the name of
 	 * the of the task that executes the function.
-	 * 
+	 *
 	 * <p>The RuntimeContext also gives access to the
 	 * {@link org.apache.flink.api.common.accumulators.Accumulator}s and the
 	 * {@link org.apache.flink.api.common.cache.DistributedCache}.
-	 * 
+	 *
 	 * @return The UDF's runtime context.
 	 */
 	RuntimeContext getRuntimeContext();
@@ -99,15 +99,15 @@ public interface RichFunction extends Function {
 	 * Gets a specialized version of the {@link RuntimeContext}, which has additional information
 	 * about the iteration in which the function is executed. This IterationRuntimeContext is only
 	 * available if the function is part of an iteration. Otherwise, this method throws an exception.
-	 * 
+	 *
 	 * @return The IterationRuntimeContext.
 	 * @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an iteration.
 	 */
 	IterationRuntimeContext getIterationRuntimeContext();
-	
+
 	/**
 	 * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
-	 *  
+	 *
 	 * @param t The runtime context.
 	 */
 	void setRuntimeContext(RuntimeContext t);

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
index de34031..673a8e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.common.functions;
 
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index 2c74902..5646ead 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -27,20 +27,20 @@ import org.apache.flink.util.Collector;
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
  *
- * Partial computation can significantly improve the performance of a {@link RichGroupReduceFunction}.
+ * <p>Partial computation can significantly improve the performance of a {@link RichGroupReduceFunction}.
  * This technique is also known as applying a Combiner.
- * Implement the {@link GroupCombineFunction<IN, IN>} interface to enable partial computation, i.e.,
+ * Implement the {@link GroupCombineFunction} interface to enable partial computation, i.e.,
  * a combiner for this {@link RichGroupReduceFunction}.
- * 
+ *
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
  */
 @Public
 public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT> {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	@Override
 	public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
index ceeca79..802d345 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.annotation.Public;
  * @param <OUT> The type of the result elements.
  */
 @Public
-public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
+public abstract class RichJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction implements JoinFunction<IN1, IN2, OUT> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
index 81fec5d..d4683c9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.Public;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */
@@ -33,7 +33,7 @@ import org.apache.flink.annotation.Public;
 public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	public abstract OUT map(IN value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
index b3b800a..d2781af 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <I> Type of the input elements.
  * @param <O> Type of the returned elements.
  */
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
 public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunction implements MapPartitionFunction<I, O> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	public abstract void mapPartition(Iterable<I> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
index 7b42ec7..5dc16e6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
@@ -25,12 +25,12 @@ import org.apache.flink.annotation.Public;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
- * 
+ *
  * @param <T> Type of the elements that this function processes.
  */
 @Public
 public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index e221091..1a4a9a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -47,10 +47,10 @@ import java.util.Map;
 
 /**
  * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
- * of the function will have a context through which it can access static contextual information (such as 
+ * of the function will have a context through which it can access static contextual information (such as
  * the current parallelism) and other constructs like accumulators and broadcast variables.
- * <p>
- * A function can, during runtime, obtain the RuntimeContext via a call to
+ *
+ * <p>A function can, during runtime, obtain the RuntimeContext via a call to
  * {@link AbstractRichFunction#getRuntimeContext()}.
  */
 @Public
@@ -58,14 +58,14 @@ public interface RuntimeContext {
 
 	/**
 	 * Returns the name of the task in which the UDF runs, as assigned during plan construction.
-	 * 
+	 *
 	 * @return The name of the task in which the UDF runs.
 	 */
 	String getTaskName();
 
 	/**
 	 * Returns the metric group for this parallel subtask.
-	 * 
+	 *
 	 * @return The metric group for this parallel subtask.
 	 */
 	@PublicEvolving
@@ -73,7 +73,7 @@ public interface RuntimeContext {
 
 	/**
 	 * Gets the parallelism with which the parallel task runs.
-	 * 
+	 *
 	 * @return The parallelism with which the parallel task runs.
 	 */
 	int getNumberOfParallelSubtasks();
@@ -89,7 +89,7 @@ public interface RuntimeContext {
 	/**
 	 * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
 	 * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
-	 * 
+	 *
 	 * @return The index of the parallel subtask.
 	 */
 	int getIndexOfThisSubtask();
@@ -115,11 +115,11 @@ public interface RuntimeContext {
 	 * job.
 	 */
 	ExecutionConfig getExecutionConfig();
-	
+
 	/**
 	 * Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the
 	 * jar file of a user job.
-	 * 
+	 *
 	 * @return The ClassLoader for user code classes.
 	 */
 	ClassLoader getUserCodeClassLoader();
@@ -137,7 +137,7 @@ public interface RuntimeContext {
 	/**
 	 * Get an existing accumulator object. The accumulator must have been added
 	 * previously in this local runtime context.
-	 * 
+	 *
 	 * Throws an exception if the accumulator does not exist or if the
 	 * accumulator exists, but with different type.
 	 */
@@ -175,7 +175,7 @@ public interface RuntimeContext {
 	 */
 	@PublicEvolving
 	Histogram getHistogram(String name);
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -189,27 +189,27 @@ public interface RuntimeContext {
 	boolean hasBroadcastVariable(String name);
 
 	/**
-	 * Returns the result bound to the broadcast variable identified by the 
+	 * Returns the result bound to the broadcast variable identified by the
 	 * given {@code name}.
 	 * <p>
 	 * IMPORTANT: The broadcast variable data structure is shared between the parallel
 	 *            tasks on one machine. Any access that modifies its internal state needs to
 	 *            be manually synchronized by the caller.
-	 * 
+	 *
 	 * @param name The name under which the broadcast variable is registered;
 	 * @return The broadcast variable, materialized as a list of elements.
 	 */
 	<RT> List<RT> getBroadcastVariable(String name);
-	
+
 	/**
-	 * Returns the result bound to the broadcast variable identified by the 
+	 * Returns the result bound to the broadcast variable identified by the
 	 * given {@code name}. The broadcast variable is returned as a shared data structure
 	 * that is initialized with the given {@link BroadcastVariableInitializer}.
 	 * <p>
 	 * IMPORTANT: The broadcast variable data structure is shared between the parallel
 	 *            tasks on one machine. Any access that modifies its internal state needs to
 	 *            be manually synchronized by the caller.
-	 * 
+	 *
 	 * @param name The name under which the broadcast variable is registered;
 	 * @param initializer The initializer that creates the shared data structure of the broadcast
 	 *                    variable from the sequence of elements.
@@ -220,11 +220,11 @@ public interface RuntimeContext {
 	/**
 	 * Returns the {@link DistributedCache} to get the local temporary file copies of files otherwise not
 	 * locally accessible.
-	 * 
+	 *
 	 * @return The distributed cache of the worker executing this instance.
 	 */
 	DistributedCache getDistributedCache();
-	
+
 	// ------------------------------------------------------------------------
 	//  Methods for accessing state
 	// ------------------------------------------------------------------------
@@ -280,7 +280,7 @@ public interface RuntimeContext {
 	 * Gets a handle to the system's key/value list state. This state is similar to the state
 	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
 	 * holds lists. One can add elements to the list, or retrieve the list as a whole.
-	 * 
+	 *
 	 * <p>This state is only accessible if the function is executed on a KeyedStream.
 	 *
 	 * <pre>{@code
@@ -345,7 +345,7 @@ public interface RuntimeContext {
 	 *         return new Tuple2<>(value, state.get());
 	 *     }
 	 * });
-	 * 
+	 *
 	 * }</pre>
 	 *
 	 * @param stateProperties The descriptor defining the properties of the stats.
@@ -446,7 +446,7 @@ public interface RuntimeContext {
 	@PublicEvolving
 	@Deprecated
 	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
-	
+
 	/**
 	 * Gets a handle to the system's key/value map state. This state is similar to the state
 	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
index e6da8a8..07ef372 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
@@ -3,13 +3,14 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -25,12 +26,12 @@ public interface StoppableFunction {
 	 * Stops the source. In contrast to {@code cancel()} this is a request to the source function to shut down
 	 * gracefully. Pending data can still be emitted and it is not required to stop immediately -- however, in the near
 	 * future. The job will keep running until all emitted data is processed completely.
-	 * <p>
-	 * Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source
+	 *
+	 * <p>Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source
 	 * will break out of this loop. This can be achieved by having a volatile field "isRunning" that is checked in the
 	 * loop and that is set to false in this method.
-	 * <p>
-	 * <strong>The call to {@code stop()} should not block and not throw any exception.</strong>
+	 *
+	 * <p><strong>The call to {@code stop()} should not block and not throw any exception.</strong>
 	 */
 	void stop();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index d6262c7..f63c45e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -68,13 +68,13 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 	private final Map<String, Accumulator<?, ?>> accumulators;
 
 	private final DistributedCache distributedCache;
-	
+
 	private final MetricGroup metrics;
 
 	public AbstractRuntimeUDFContext(TaskInfo taskInfo,
 										ClassLoader userCodeClassLoader,
 										ExecutionConfig executionConfig,
-										Map<String, Accumulator<?,?>> accumulators,
+										Map<String, Accumulator<?, ?>> accumulators,
 										Map<String, Future<Path>> cpTasks,
 										MetricGroup metrics) {
 		this.taskInfo = checkNotNull(taskInfo);
@@ -164,23 +164,23 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
 		return Collections.unmodifiableMap(this.accumulators);
 	}
-	
+
 	@Override
 	public ClassLoader getUserCodeClassLoader() {
 		return this.userCodeClassLoader;
 	}
-	
+
 	@Override
 	public DistributedCache getDistributedCache() {
 		return this.distributedCache;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@SuppressWarnings("unchecked")
 	private <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name,
-			Class<? extends Accumulator<V, A>> accumulatorClass)
-	{
+			Class<? extends Accumulator<V, A>> accumulatorClass) {
+
 		Accumulator<?, ?> accumulator = accumulators.get(name);
 
 		if (accumulator != null) {
@@ -233,7 +233,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}
-	
+
 	@Override
 	@PublicEvolving
 	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
index c5adb2e..0de1c4d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.TraversableOnceException;
 
+import java.util.Iterator;
+
+/**
+ * Wraps an iterator to return deep copies of the original iterator's elements.
+ *
+ * @param <E> The type of the element returned by the iterator.
+ */
 @Internal
 public class CopyingIterator<E> implements Iterator<E>, Iterable<E> {
 
 	private final Iterator<E> source;
 	private final TypeSerializer<E> serializer;
-	
+
 	private boolean available = true;
-	
+
 	public CopyingIterator(Iterator<E> source, TypeSerializer<E> serializer) {
 		this.source = source;
 		this.serializer = serializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
index 16b7bf3..bfbd7ad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.util.List;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Collector;
 
+import java.util.List;
+
+/**
+ * A {@link Collector} that collects deep copies of its elements in a list.
+ *
+ * @param <T> The type of the collected elements.
+ */
 @Internal
 public class CopyingListCollector<T> implements Collector<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
index c820640..8ffa886 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.util.List;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Collector;
 
+import java.util.List;
+
 /**
  * A {@link Collector} that puts the collected elements into a given list.
- * 
+ *
  * @param <T> The type of the collected elements.
  */
 @Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
index 6351365..a96f566 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java
@@ -21,6 +21,9 @@ package org.apache.flink.api.common.functions.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 
+/**
+ * An {@link AbstractRichFunction} that does nothing.
+ */
 @Internal
 public class NoOpFunction extends AbstractRichFunction {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/9644df74/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index ba3f85e..bab2eb0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -32,15 +27,20 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
 /**
  * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
  */
 @Internal
 public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 
-	private final HashMap<String, Object> initializedBroadcastVars = new HashMap<String, Object>();
-	
-	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
+	private final HashMap<String, Object> initializedBroadcastVars = new HashMap<>();
+
+	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<>();
 
 	public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
 								Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
@@ -56,7 +56,7 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 	@Override
 	@SuppressWarnings("unchecked")
 	public <RT> List<RT> getBroadcastVariable(String name) {
-		
+
 		// check if we have an initialized version
 		Object o = this.initializedBroadcastVars.get(name);
 		if (o != null) {
@@ -64,7 +64,7 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 				return (List<RT>) o;
 			}
 			else {
-				throw new IllegalStateException("The broadcast variable with name '" + name + 
+				throw new IllegalStateException("The broadcast variable with name '" + name +
 						"' is not a List. A different call must have requested this variable with a BroadcastVariableInitializer.");
 			}
 		}
@@ -79,11 +79,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 			}
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		
+
 		// check if we have an initialized version
 		Object o = this.initializedBroadcastVars.get(name);
 		if (o != null) {
@@ -101,19 +101,19 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void setBroadcastVariable(String name, List<?> value) {
 		this.uninitializedBroadcastVars.put(name, value);
 		this.initializedBroadcastVars.remove(name);
 	}
-	
+
 	public void clearBroadcastVariable(String name) {
 		this.uninitializedBroadcastVars.remove(name);
 		this.initializedBroadcastVars.remove(name);
 	}
-	
+
 	public void clearAllBroadcastVariables() {
 		this.uninitializedBroadcastVars.clear();
 		this.initializedBroadcastVars.clear();