You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/07/25 08:19:06 UTC

[4/6] flink git commit: [FLINK-7181] Activate checkstyle flink-java/operators/*

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index ee5ab2e..bac85ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -33,23 +33,27 @@ import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
 
+/**
+ * An operation that allows storing data results.
+ * @param <T>
+ */
 @Public
 public class DataSink<T> {
-	
+
 	private final OutputFormat<T> format;
-	
+
 	private final TypeInformation<T> type;
-	
+
 	private final DataSet<T> data;
-	
+
 	private String name;
-	
+
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	private ResourceSpec minResources = ResourceSpec.DEFAULT;
@@ -72,14 +76,12 @@ public class DataSink<T> {
 		if (data == null) {
 			throw new IllegalArgumentException("The data set must not be null.");
 		}
-		
-		
+
 		this.format = format;
 		this.data = data;
 		this.type = type;
 	}
 
-
 	@Internal
 	public OutputFormat<T> getFormat() {
 		return format;
@@ -96,7 +98,7 @@ public class DataSink<T> {
 	}
 
 	/**
-	 * Pass a configuration to the OutputFormat
+	 * Pass a configuration to the OutputFormat.
 	 * @param parameters Configuration parameters
 	 */
 	public DataSink<T> withParameters(Configuration parameters) {
@@ -106,9 +108,11 @@ public class DataSink<T> {
 
 	/**
 	 * Sorts each local partition of a {@link org.apache.flink.api.java.tuple.Tuple} data set
-	 * on the specified field in the specified {@link Order} before it is emitted by the output format.<br>
-	 * <b>Note: Only tuple data sets can be sorted using integer field indices.</b><br>
-	 * The tuple data set can be sorted on multiple fields in different orders
+	 * on the specified field in the specified {@link Order} before it is emitted by the output format.
+	 *
+	 * <p><b>Note: Only tuple data sets can be sorted using integer field indices.</b>
+	 *
+	 * <p>The tuple data set can be sorted on multiple fields in different orders
 	 * by chaining {@link #sortLocalOutput(int, Order)} calls.
 	 *
 	 * @param field The Tuple field on which the data set is locally sorted.
@@ -132,7 +136,7 @@ public class DataSink<T> {
 			throw new InvalidProgramException("Selected sort key is not a sortable type");
 		}
 
-		if(this.sortKeyPositions == null) {
+		if (this.sortKeyPositions == null) {
 			// set sorting info
 			this.sortKeyPositions = flatKeys;
 			this.sortOrders = new Order[flatKeys.length];
@@ -144,9 +148,9 @@ public class DataSink<T> {
 			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
 			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
 
-			for(int i=0; i<flatKeys.length; i++) {
-				this.sortKeyPositions[oldLength+i] = flatKeys[i];
-				this.sortOrders[oldLength+i] = order;
+			for (int i = 0; i < flatKeys.length; i++) {
+				this.sortKeyPositions[oldLength + i] = flatKeys[i];
+				this.sortOrders[oldLength + i] = order;
 			}
 		}
 
@@ -155,10 +159,12 @@ public class DataSink<T> {
 
 	/**
 	 * Sorts each local partition of a data set on the field(s) specified by the field expression
-	 * in the specified {@link Order} before it is emitted by the output format.<br>
-	 * <b>Note: Non-composite types can only be sorted on the full element which is specified by
-	 * a wildcard expression ("*" or "_").</b><br>
-	 * Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
+	 * in the specified {@link Order} before it is emitted by the output format.
+	 *
+	 * <p><b>Note: Non-composite types can only be sorted on the full element which is specified by
+	 * a wildcard expression ("*" or "_").</b>
+	 *
+	 * <p>Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
 	 * by chaining {@link #sortLocalOutput(String, Order)} calls.
 	 *
 	 * @param fieldExpression The field expression for the field(s) on which the data set is locally sorted.
@@ -189,7 +195,7 @@ public class DataSink<T> {
 		orders = new Order[numFields];
 		Arrays.fill(orders, order);
 
-		if(this.sortKeyPositions == null) {
+		if (this.sortKeyPositions == null) {
 			// set sorting info
 			this.sortKeyPositions = fields;
 			this.sortOrders = orders;
@@ -199,9 +205,9 @@ public class DataSink<T> {
 			int newLength = oldLength + numFields;
 			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
 			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
-			for(int i=0; i<numFields; i++) {
-				this.sortKeyPositions[oldLength+i] = fields[i];
-				this.sortOrders[oldLength+i] = orders[i];
+			for (int i = 0; i < numFields; i++) {
+				this.sortKeyPositions[oldLength + i] = fields[i];
+				this.sortOrders[oldLength + i] = orders[i];
 			}
 		}
 
@@ -214,16 +220,16 @@ public class DataSink<T> {
 	public Configuration getParameters() {
 		return this.parameters;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public DataSink<T> name(String name) {
 		this.name = name;
 		return this;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected GenericDataSinkBase<T> translateToDataFlow(Operator<T> input) {
 		// select the name (or create a default one)
 		String name = this.name != null ? this.name : this.format.toString();
@@ -231,11 +237,11 @@ public class DataSink<T> {
 		// set input
 		sink.setInput(input);
 		// set parameters
-		if(this.parameters != null) {
+		if (this.parameters != null) {
 			sink.getParameters().addAll(this.parameters);
 		}
 		// set parallelism
-		if(this.parallelism > 0) {
+		if (this.parallelism > 0) {
 			// use specified parallelism
 			sink.setParallelism(this.parallelism);
 		} else {
@@ -243,34 +249,34 @@ public class DataSink<T> {
 			sink.setParallelism(input.getParallelism());
 		}
 
-		if(this.sortKeyPositions != null) {
+		if (this.sortKeyPositions != null) {
 			// configure output sorting
 			Ordering ordering = new Ordering();
-			for(int i=0; i<this.sortKeyPositions.length; i++) {
+			for (int i = 0; i < this.sortKeyPositions.length; i++) {
 				ordering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
 			}
 			sink.setLocalOrder(ordering);
 		}
-		
+
 		return sink;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "DataSink '" + (this.name == null ? "<unnamed>" : this.name) + "' (" + this.format.toString() + ")";
 	}
-	
+
 	/**
 	 * Returns the parallelism of this data sink.
-	 * 
+	 *
 	 * @return The parallelism of this data sink.
 	 */
 	public int getParallelism() {
 		return this.parallelism;
 	}
-	
+
 	/**
 	 * Sets the parallelism for this data sink.
 	 * The degree must be 1 or more.

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index af6f65b..8ae1c7d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -34,7 +34,7 @@ import org.apache.flink.configuration.Configuration;
  * An operation that creates a new data set (data source). The operation acts as the
  * data set on which to apply further transformations. It encapsulates additional
  * configuration parameters, to customize the execution.
- * 
+ *
  * @param <OUT> The type of the elements produced by this data source.
  */
 @Public
@@ -49,25 +49,25 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	private SplitDataProperties<OUT> splitDataProperties;
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a new data source.
-	 * 
+	 *
 	 * @param context The environment in which the data source gets executed.
 	 * @param inputFormat The input format that the data source executes.
 	 * @param type The type of the elements produced by this input format.
 	 */
 	public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
 		super(context, type);
-		
+
 		this.dataSourceLocationName = dataSourceLocationName;
-		
+
 		if (inputFormat == null) {
 			throw new IllegalArgumentException("The input format may not be null.");
 		}
-		
+
 		this.inputFormat = inputFormat;
-		
+
 		if (inputFormat instanceof NonParallelInput) {
 			this.parallelism = 1;
 		}
@@ -75,23 +75,23 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 
 	/**
 	 * Gets the input format that is executed by this data source.
-	 * 
+	 *
 	 * @return The input format that is executed by this data source.
 	 */
 	@Internal
 	public InputFormat<OUT, ?> getInputFormat() {
 		return this.inputFormat;
 	}
-	
+
 	/**
-	 * Pass a configuration to the InputFormat
+	 * Pass a configuration to the InputFormat.
 	 * @param parameters Configuration parameters
 	 */
 	public DataSource<OUT> withParameters(Configuration parameters) {
 		this.parameters = parameters;
 		return this;
 	}
-	
+
 	/**
 	 * @return Configuration for the InputFormat.
 	 */
@@ -99,15 +99,15 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 		return this.parameters;
 	}
 
-
 	/**
 	 * Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} for the
 	 * {@link org.apache.flink.core.io.InputSplit}s of this DataSource
 	 * for configurations.
 	 *
-	 * SplitDataProperties can help to generate more efficient execution plans.
-	 * <br>
-	 * <b>
+	 * <p>SplitDataProperties can help to generate more efficient execution plans.
+	 *
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -115,28 +115,28 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	 */
 	@PublicEvolving
 	public SplitDataProperties<OUT> getSplitDataProperties() {
-		if(this.splitDataProperties == null) {
+		if (this.splitDataProperties == null) {
 			this.splitDataProperties = new SplitDataProperties<OUT>(this);
 		}
 		return this.splitDataProperties;
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
-		String name = this.name != null ? this.name : "at "+dataSourceLocationName+" ("+inputFormat.getClass().getName()+")";
+		String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
 		if (name.length() > 150) {
 			name = name.substring(0, 150);
 		}
-		
-		@SuppressWarnings({ "unchecked", "rawtypes" })
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
 		GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
-				new OperatorInformation<OUT>(getType()), name);
+			new OperatorInformation<OUT>(getType()), name);
 		source.setParallelism(parallelism);
-		if(this.parameters != null) {
+		if (this.parameters != null) {
 			source.getParameters().addAll(this.parameters);
 		}
-		if(this.splitDataProperties != null) {
+		if (this.splitDataProperties != null) {
 			source.setSplitDataProperties(this.splitDataProperties);
 		}
 		return source;

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index 61f83b1..dc80e70 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.util.Arrays;
-
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -35,10 +33,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
+
 /**
  * The DeltaIteration represents the start of a delta iteration. It is created from the DataSet that
  * represents the initial solution set via the {@link DataSet#iterateDelta(DataSet, int, int...)} method.
- * 
+ *
  * @param <ST> The data type of the solution set.
  * @param <WT> The data type of the workset (the feedback data set).
  *
@@ -47,27 +47,27 @@ import org.apache.flink.util.Preconditions;
  */
 @Public
 public class DeltaIteration<ST, WT> {
-	
+
 	private final AggregatorRegistry aggregators = new AggregatorRegistry();
-	
+
 	private final DataSet<ST> initialSolutionSet;
 	private final DataSet<WT> initialWorkset;
-	
+
 	private final SolutionSetPlaceHolder<ST> solutionSetPlaceholder;
 	private final WorksetPlaceHolder<WT> worksetPlaceholder;
 
 	private final Keys<ST> keys;
-	
+
 	private final int maxIterations;
-	
+
 	private String name;
-	
+
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	private ResourceSpec minResources = ResourceSpec.DEFAULT;
 
 	private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
-	
+
 	private boolean solutionSetUnManaged;
 
 	public DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
@@ -80,15 +80,15 @@ public class DeltaIteration<ST, WT> {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Closes the delta iteration. This method defines the end of the delta iteration's function.
-	 * 
+	 *
 	 * @param solutionSetDelta The delta for the solution set. The delta will be merged into the solution set at the end of
 	 *                         each iteration.
 	 * @param newWorkset The new workset (feedback data set) that will be fed back to the next iteration.
 	 * @return The DataSet that represents the result of the iteration, after the computation has terminated.
-	 * 
+	 *
 	 * @see DataSet#iterateDelta(DataSet, int, int...)
 	 */
 	public DataSet<ST> closeWith(DataSet<ST> solutionSetDelta, DataSet<WT> newWorkset) {
@@ -98,18 +98,18 @@ public class DeltaIteration<ST, WT> {
 
 	/**
 	 * Gets the initial solution set. This is the data set on which the delta iteration was started.
-	 * <p>
-	 * Consider the following example:
+	 *
+	 * <p>Consider the following example:
 	 * <pre>
 	 * {@code
 	 * DataSet<MyType> solutionSetData = ...;
 	 * DataSet<AnotherType> worksetData = ...;
-	 * 
+	 *
 	 * DeltaIteration<MyType, AnotherType> iteration = solutionSetData.iteratorDelta(worksetData, 10, ...);
 	 * }
 	 * </pre>
 	 * The <tt>solutionSetData</tt> would be the data set returned by {@code iteration.getInitialSolutionSet();}.
-	 * 
+	 *
 	 * @return The data set that forms the initial solution set.
 	 */
 	public DataSet<ST> getInitialSolutionSet() {
@@ -119,18 +119,18 @@ public class DeltaIteration<ST, WT> {
 	/**
 	 * Gets the initial workset. This is the data set passed to the method that starts the delta
 	 * iteration.
-	 * <p>
-	 * Consider the following example:
+	 *
+	 * <p>Consider the following example:
 	 * <pre>
 	 * {@code
 	 * DataSet<MyType> solutionSetData = ...;
 	 * DataSet<AnotherType> worksetData = ...;
-	 * 
+	 *
 	 * DeltaIteration<MyType, AnotherType> iteration = solutionSetData.iteratorDelta(worksetData, 10, ...);
 	 * }
 	 * </pre>
 	 * The <tt>worksetData</tt> would be the data set returned by {@code iteration.getInitialWorkset();}.
-	 * 
+	 *
 	 * @return The data set that forms the initial workset.
 	 */
 	public DataSet<WT> getInitialWorkset() {
@@ -139,7 +139,7 @@ public class DeltaIteration<ST, WT> {
 
 	/**
 	 * Gets the solution set of the delta iteration. The solution set represents the state that is kept across iterations.
-	 * 
+	 *
 	 * @return The solution set of the delta iteration.
 	 */
 	public SolutionSetPlaceHolder<ST> getSolutionSet() {
@@ -157,7 +157,7 @@ public class DeltaIteration<ST, WT> {
 
 	/**
 	 * Sets the name for the iteration. The name is displayed in logs and messages.
-	 * 
+	 *
 	 * @param name The name for the iteration.
 	 * @return The iteration object, for function call chaining.
 	 */
@@ -165,16 +165,16 @@ public class DeltaIteration<ST, WT> {
 		this.name = name;
 		return this;
 	}
-	
+
 	/**
 	 * Gets the name from this iteration.
-	 * 
+	 *
 	 * @return The name of the iteration.
 	 */
 	public String getName() {
 		return name;
 	}
-	
+
 	/**
 	 * Sets the parallelism for the iteration.
 	 *
@@ -187,10 +187,10 @@ public class DeltaIteration<ST, WT> {
 		this.parallelism = parallelism;
 		return this;
 	}
-	
+
 	/**
 	 * Gets the iteration's parallelism.
-	 * 
+	 *
 	 * @return The iteration's parallelism, or {@link ExecutionConfig#PARALLELISM_DEFAULT} if not set.
 	 */
 	public int getParallelism() {
@@ -266,13 +266,13 @@ public class DeltaIteration<ST, WT> {
 	 * iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step,
 	 * the values are globally aggregated to produce one aggregate that represents statistics across all parallel instances.
 	 * The value of an aggregator can be accessed in the next iteration.
-	 * <p>
-	 * Aggregators can be accessed inside a function via the
+	 *
+	 * <p>Aggregators can be accessed inside a function via the
 	 * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()} method.
-	 * 
+	 *
 	 * @param name The name under which the aggregator is registered.
 	 * @param aggregator The aggregator class.
-	 * 
+	 *
 	 * @return The DeltaIteration itself, to allow chaining function calls.
 	 */
 	@PublicEvolving
@@ -297,62 +297,61 @@ public class DeltaIteration<ST, WT> {
 	 */
 	@PublicEvolving
 	public <X extends Value> DeltaIteration<ST, WT> registerAggregationConvergenceCriterion(
-			String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck)
-	{
+			String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck) {
 		this.aggregators.registerAggregationConvergenceCriterion(name, aggregator, convergenceCheck);
 		return this;
 	}
-	
+
 	/**
 	 * Gets the registry for aggregators for the iteration.
-	 * 
+	 *
 	 * @return The registry with all aggregators.
 	 */
 	@PublicEvolving
 	public AggregatorRegistry getAggregators() {
 		return this.aggregators;
 	}
-	
+
 	/**
 	 * Sets whether to keep the solution set in managed memory (safe against heap exhaustion) or unmanaged memory
 	 * (objects on heap).
-	 * 
+	 *
 	 * @param solutionSetUnManaged True to keep the solution set in unmanaged memory, false to keep it in managed memory.
-	 * 
+	 *
 	 * @see #isSolutionSetUnManaged()
 	 */
 	public void setSolutionSetUnManaged(boolean solutionSetUnManaged) {
 		this.solutionSetUnManaged = solutionSetUnManaged;
 	}
-	
+
 	/**
 	 * gets whether the solution set is in managed or unmanaged memory.
-	 * 
+	 *
 	 * @return True, if the solution set is in unmanaged memory (object heap), false if in managed memory.
-	 * 
+	 *
 	 * @see #setSolutionSetUnManaged(boolean)
 	 */
 	public boolean isSolutionSetUnManaged() {
 		return solutionSetUnManaged;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * A {@link DataSet} that acts as a placeholder for the solution set during the iteration.
-	 * 
+	 *
 	 * @param <ST> The type of the elements in the solution set.
 	 */
 	@Public
 	public static class SolutionSetPlaceHolder<ST> extends DataSet<ST>{
-		
+
 		private final DeltaIteration<ST, ?> deltaIteration;
-		
+
 		private SolutionSetPlaceHolder(ExecutionEnvironment context, TypeInformation<ST> type, DeltaIteration<ST, ?> deltaIteration) {
 			super(context, type);
 			this.deltaIteration = deltaIteration;
 		}
-		
+
 		public void checkJoinKeyFields(int[] keyFields) {
 			int[] ssKeys = deltaIteration.keys.computeLogicalKeyPositions();
 			if (!Arrays.equals(ssKeys, keyFields)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
index e593488..de93dbb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
@@ -17,6 +17,7 @@
  */
 
 package org.apache.flink.api.java.operators;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.operators.Keys;
@@ -24,19 +25,24 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
+/**
+ * Resulting {@link DataSet} of a delta iteration operation.
+ * @param <ST>
+ * @param <WT>
+ */
 @Public
 public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
 
 	private DeltaIteration<ST, WT> iterationHead;
 
 	private DataSet<ST> nextSolutionSet;
-	
+
 	private DataSet<WT> nextWorkset;
-	
+
 	private Keys<ST> keys;
-	
+
 	private int maxIterations;
-	
+
 	private TypeInformation<WT> typeWS;
 
 	DeltaIterationResultSet(ExecutionEnvironment context,
@@ -46,8 +52,7 @@ public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
 							DataSet<ST> nextSolutionSet,
 							DataSet<WT> nextWorkset,
 							Keys<ST> keys,
-							int maxIterations)
-	{
+							int maxIterations) {
 		super(context, typeSS);
 		this.iterationHead = iterationHead;
 		this.nextWorkset = nextWorkset;
@@ -60,7 +65,7 @@ public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
 	public DeltaIteration<ST, WT> getIterationHead() {
 		return iterationHead;
 	}
-	
+
 	public DataSet<ST> getNextSolutionSet() {
 		return nextSolutionSet;
 	}
@@ -78,7 +83,7 @@ public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
 	public int getMaxIterations() {
 		return maxIterations;
 	}
-	
+
 	public TypeInformation<WT> getWorksetType() {
 		return typeWS;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 267513d..9dc2a9c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -111,7 +111,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	/**
 	 * Sets the strategy to use for the combine phase of the reduce.
 	 *
-	 * If this method is not called, then the default hint will be used.
+	 * <p>If this method is not called, then the default hint will be used.
 	 * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
 	 *
 	 * @param strategy The hint to use.
@@ -132,8 +132,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 			String name,
 			Operator<IN> input,
 			int parallelism,
-			CombineHint hint)
-	{
+			CombineHint hint) {
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 
@@ -150,7 +149,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	}
 
 	@Internal
-	public static final class DistinctFunction<T> implements ReduceFunction<T> {
+	private static final class DistinctFunction<T> implements ReduceFunction<T> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index ae8b5ea..901274e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -22,45 +22,45 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
 
 /**
  * This operator represents the application of a "filter" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <T> The type of the data set filtered by the operator.
  */
 @Public
 public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
-	
+
 	protected final FilterFunction<T> function;
-	
+
 	protected final String defaultName;
 
 	public FilterOperator(DataSet<T> input, FilterFunction<T> function, String defaultName) {
 		super(input, input.getType());
-		
+
 		this.function = function;
 		this.defaultName = defaultName;
 
 		UdfOperatorUtils.analyzeSingleInputUdf(this, FilterFunction.class, defaultName, function, null);
 	}
-	
+
 	@Override
 	protected FilterFunction<T> getFunction() {
 		return function;
 	}
-	
+
 	@Override
-	protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-		
-		String name = getName() != null ? getName() : "Filter at "+defaultName;
-		
+	protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T, T>> translateToDataFlow(Operator<T> input) {
+
+		String name = getName() != null ? getName() : "Filter at " + defaultName;
+
 		// create operator
 		PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
 		po.setInput(input);
-		
+
 		// set parallelism
 		if (getParallelism() > 0) {
 			// use specified parallelism
@@ -69,7 +69,7 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
 			// if no parallelism has been specified, use parallelism of input operator to enable chaining
 			po.setParallelism(input.getParallelism());
 		}
-		
+
 		return po;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index ed730ae..56d7cb8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -29,48 +29,48 @@ import org.apache.flink.api.java.DataSet;
 /**
  * This operator represents the application of a "flatMap" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <IN> The type of the data set consumed by the operator.
  * @param <OUT> The type of the data set created by the operator.
  */
 @Public
 public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {
-	
+
 	protected final FlatMapFunction<IN, OUT> function;
-	
+
 	protected final String defaultName;
-	
+
 	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
-		
+
 		this.function = function;
 		this.defaultName = defaultName;
 
 		UdfOperatorUtils.analyzeSingleInputUdf(this, FlatMapFunction.class, defaultName, function, null);
 	}
-	
+
 	@Override
 	protected FlatMapFunction<IN, OUT> getFunction() {
 		return function;
 	}
 
 	@Override
-	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
-		String name = getName() != null ? getName() : "FlatMap at "+defaultName;
+	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
 		// create operator
 		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,
-				new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+			new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
 		// set parallelism
-		if(this.getParallelism() > 0) {
+		if (this.getParallelism() > 0) {
 			// use specified parallelism
 			po.setParallelism(this.getParallelism());
 		} else {
 			// if no parallelism has been specified, use parallelism of input operator to enable chaining
 			po.setParallelism(input.getParallelism());
 		}
-		
+
 		return po;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 0c8e657..e4ed07f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -33,13 +34,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 
 /**
  * This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data
- * locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values 
+ * locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values
  * into an intermediate representation before applying a proper reduce operation.
  *
  * @param <IN> The type of the data set consumed by the operator.
@@ -95,12 +95,12 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		SingleInputSemanticProperties props = super.getSemanticProperties();
 
 		// offset semantic information by extracted key fields
-		if(props != null &&
+		if (props != null &&
 				this.grouper != null &&
 				this.grouper.keys instanceof SelectorFunctionKeys) {
 
-			int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
-			if(this.grouper instanceof SortedGrouping) {
+			int offset = ((SelectorFunctionKeys<?, ?>) this.grouper.keys).getKeyType().getTotalFields();
+			if (this.grouper instanceof SortedGrouping) {
 				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 
@@ -174,7 +174,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
 
 				Ordering o = new Ordering();
-				for(int i=0; i < sortKeyPositions.length; i++) {
+				for (int i = 0; i < sortKeyPositions.length; i++) {
 					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
 				}
 				po.setGroupOrder(o);
@@ -187,7 +187,6 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
@@ -196,8 +195,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 			GroupCombineFunction<IN, OUT> function,
 			TypeInformation<OUT> outputType,
 			String name,
-			Operator<IN> input)
-	{
+			Operator<IN> input) {
 		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 
 		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
@@ -218,10 +216,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 			GroupCombineFunction<IN, OUT> function,
 			TypeInformation<OUT> outputType,
 			String name,
-			Operator<IN> input)
-	{
+			Operator<IN> input) {
 		final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
-		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>)rawSortingKeys;
+		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKeys;
 		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey, sortingKey);
 
 		Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index b339baf..069ac44 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.functions.CombineFunction;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.operators.translation.CombineToGroupCombineWrapper;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -32,15 +33,15 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
-import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.operators.translation.CombineToGroupCombineWrapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
 import org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.DataSet;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,7 @@ import java.lang.reflect.Type;
 /**
  * This operator represents the application of a "reduceGroup" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <IN> The type of the data set consumed by the operator.
  * @param <OUT> The type of the data set created by the operator.
  */
@@ -62,14 +63,14 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	private GroupReduceFunction<IN, OUT> function;
 
 	private final Grouping<IN> grouper;
-	
+
 	private final String defaultName;
 
 	private boolean combinable;
 
 	/**
 	 * Constructor for a non-grouped reduce (all reduce).
-	 * 
+	 *
 	 * @param input The input data set to the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
@@ -82,10 +83,10 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 
 		this.combinable = checkCombinability();
 	}
-	
+
 	/**
 	 * Constructor for a grouped reduce.
-	 * 
+	 *
 	 * @param input The grouped input to be processed group-wise by the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
@@ -149,14 +150,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		}
 		return false;
 	}
-	
-	
+
 	@Override
 	protected GroupReduceFunction<IN, OUT> getFunction() {
 		return function;
 	}
 
-	
 	// --------------------------------------------------------------------------------------------
 	//  Properties
 	// --------------------------------------------------------------------------------------------
@@ -164,10 +163,10 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	public boolean isCombinable() {
 		return combinable;
 	}
-	
+
 	public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
 
-		if(combinable) {
+		if (combinable) {
 			// sanity check that the function is a subclass of the combine interface
 			if (!checkCombinability()) {
 				throw new IllegalArgumentException("Either the function does not implement a combine interface, " +
@@ -188,12 +187,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		SingleInputSemanticProperties props = super.getSemanticProperties();
 
 		// offset semantic information by extracted key fields
-		if(props != null &&
+		if (props != null &&
 				this.grouper != null &&
 				this.grouper.keys instanceof SelectorFunctionKeys) {
 
-			int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
-			if(this.grouper instanceof SortedGrouping) {
+			int offset = ((SelectorFunctionKeys<?, ?>) this.grouper.keys).getKeyType().getTotalFields();
+			if (this.grouper instanceof SortedGrouping) {
 				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
@@ -205,7 +204,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	@SuppressWarnings("unchecked")
 	protected GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
@@ -225,16 +224,16 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
 			GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
 					new GroupReduceOperatorBase<>(function, operatorInfo, new int[0], name);
-			
+
 			po.setCombinable(combinable);
 			po.setInput(input);
 			// the parallelism for a non grouped reduce can only be 1
 			po.setParallelism(1);
 			return po;
 		}
-	
+
 		if (grouper.getKeys() instanceof SelectorFunctionKeys) {
-		
+
 			@SuppressWarnings("unchecked")
 			SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();
 
@@ -271,29 +270,28 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			po.setInput(input);
 			po.setParallelism(getParallelism());
 			po.setCustomPartitioner(grouper.getCustomPartitioner());
-			
+
 			// set group order
 			if (grouper instanceof SortedGrouping) {
 				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
 
 				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
 				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
-				
+
 				Ordering o = new Ordering();
-				for(int i=0; i < sortKeyPositions.length; i++) {
+				for (int i = 0; i < sortKeyPositions.length; i++) {
 					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
 				}
 				po.setGroupOrder(o);
 			}
-			
+
 			return po;
 		}
 		else {
 			throw new UnsupportedOperationException("Unrecognized key type.");
 		}
 	}
-	
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
@@ -303,8 +301,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			TypeInformation<OUT> outputType,
 			String name,
 			Operator<IN> input,
-			boolean combinable)
-	{
+			boolean combinable) {
 		SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
 
@@ -326,11 +323,10 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		TypeInformation<OUT> outputType,
 		String name,
 		Operator<IN> input,
-		boolean combinable)
-	{
+		boolean combinable) {
 		final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
 		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKey;
-		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey,sortingKey);
+		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey, sortingKey);
 
 		Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index dbaaa9d..74bd9e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -26,8 +26,9 @@ import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.DataSet;
 
 /**
- * Grouping is an intermediate step for a transformation on a grouped DataSet.<br>
- * The following transformation can be applied on Grouping:
+ * Grouping is an intermediate step for a transformation on a grouped DataSet.
+ *
+ * <p>The following transformation can be applied on Grouping:
  * <ul>
  * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
  * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}, and</li>
@@ -35,24 +36,23 @@ import org.apache.flink.api.java.DataSet;
  * </ul>
  *
  * @param <T> The type of the elements of the grouped DataSet.
- * 
+ *
  * @see DataSet
  */
 @Public
 public abstract class Grouping<T> {
-	
+
 	protected final DataSet<T> inputDataSet;
-	
+
 	protected final Keys<T> keys;
-	
+
 	protected Partitioner<?> customPartitioner;
 
-	
 	public Grouping(DataSet<T> set, Keys<T> keys) {
 		if (set == null || keys == null) {
 			throw new NullPointerException();
 		}
-		
+
 		if (keys.isEmpty()) {
 			throw new InvalidProgramException("The grouping keys must not be empty.");
 		}
@@ -60,17 +60,17 @@ public abstract class Grouping<T> {
 		this.inputDataSet = set;
 		this.keys = keys;
 	}
-	
+
 	/**
 	 * Returns the input DataSet of a grouping operation, that is the one before the grouping. This means that
 	 * if it is applied directly to the result of a grouping operation, it will cancel its effect. As an example, in the
 	 * following snippet:
-	 * <pre><code>
+	 * <pre>{@code
 	 * DataSet<X> notGrouped = input.groupBy().getDataSet();
 	 * DataSet<Y> allReduced = notGrouped.reduce()
-	 * </pre></code>
-	 * the <code>groupBy()</code> is as if it never happened, as the <code>notGrouped</code> DataSet corresponds
-	 * to the input of the <code>groupBy()</code> (because of the <code>getDataset()</code>).
+	 * }</pre>
+	 * the {@code groupBy()} is as if it never happened, as the {@code notGrouped} DataSet corresponds
+	 * to the input of the {@code groupBy()} (because of the {@code getDataset()}).
 	 * */
 	@Internal
 	public DataSet<T> getInputDataSet() {
@@ -81,11 +81,11 @@ public abstract class Grouping<T> {
 	public Keys<T> getKeys() {
 		return this.keys;
 	}
-	
+
 	/**
 	 * Gets the custom partitioner to be used for this grouping, or {@code null}, if
 	 * none was defined.
-	 * 
+	 *
 	 * @return The custom partitioner to be used for this grouping.
 	 */
 	@Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
index c7ff6ab..d522f02 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
@@ -31,9 +31,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.types.Value;
 
 /**
- * The IterativeDataSet represents the start of an iteration. It is created from the DataSet that 
+ * The IterativeDataSet represents the start of an iteration. It is created from the DataSet that
  * represents the initial solution set via the {@link DataSet#iterate(int)} method.
- * 
+ *
  * @param <T> The data type of set that is the input and feedback of the iteration.
  *
  * @see DataSet#iterate(int)
@@ -42,33 +42,33 @@ import org.apache.flink.types.Value;
 public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeDataSet<T>> {
 
 	private final AggregatorRegistry aggregators = new AggregatorRegistry();
-	
+
 	private int maxIterations;
 
 	public IterativeDataSet(ExecutionEnvironment context, TypeInformation<T> type, DataSet<T> input, int maxIterations) {
 		super(input, type);
 		this.maxIterations = maxIterations;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Closes the iteration. This method defines the end of the iterative program part.
-	 * 
+	 *
 	 * @param iterationResult The data set that will be fed back to the next iteration.
 	 * @return The DataSet that represents the result of the iteration, after the computation has terminated.
-	 * 
+	 *
 	 * @see DataSet#iterate(int)
 	 */
 	public DataSet<T> closeWith(DataSet<T> iterationResult) {
 		return new BulkIterationResultSet<T>(getExecutionEnvironment(), getType(), this, iterationResult);
 	}
-	
+
 	/**
 	 * Closes the iteration and specifies a termination criterion. This method defines the end of
 	 * the iterative program part.
-	 * <p>
-	 * The termination criterion is a means of dynamically signaling the iteration to halt. It is expressed via a data
+	 *
+	 * <p>The termination criterion is a means of dynamically signaling the iteration to halt. It is expressed via a data
 	 * set that will trigger to halt the loop as soon as the data set is empty. A typical way of using the termination
 	 * criterion is to have a filter that filters out all elements that are considered non-converged. As soon as no more
 	 * such elements exist, the iteration finishes.
@@ -76,7 +76,7 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
 	 * @param iterationResult The data set that will be fed back to the next iteration.
 	 * @param terminationCriterion The data set that being used to trigger halt on operation once it is empty.
 	 * @return The DataSet that represents the result of the iteration, after the computation has terminated.
-	 * 
+	 *
 	 * @see DataSet#iterate(int)
 	 */
 	public DataSet<T> closeWith(DataSet<T> iterationResult, DataSet<?> terminationCriterion) {
@@ -85,25 +85,25 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
 
 	/**
 	 * Gets the maximum number of iterations.
-	 * 
+	 *
 	 * @return The maximum number of iterations.
 	 */
 	public int getMaxIterations() {
 		return maxIterations;
 	}
-	
+
 	/**
 	 * Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the
 	 * iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step,
 	 * the values are globally aggregated to produce one aggregate that represents statistics across all parallel instances.
 	 * The value of an aggregator can be accessed in the next iteration.
-	 * <p>
-	 * Aggregators can be accessed inside a function via the
+	 *
+	 * <p>Aggregators can be accessed inside a function via the
 	 * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()} method.
-	 * 
+	 *
 	 * @param name The name under which the aggregator is registered.
 	 * @param aggregator The aggregator class.
-	 * 
+	 *
 	 * @return The IterativeDataSet itself, to allow chaining function calls.
 	 */
 	@PublicEvolving
@@ -111,7 +111,7 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
 		this.aggregators.registerAggregator(name, aggregator);
 		return this;
 	}
-	
+
 	/**
 	 * Registers an {@link Aggregator} for the iteration together with a {@link ConvergenceCriterion}. For a general description
 	 * of aggregators, see {@link #registerAggregator(String, Aggregator)} and {@link Aggregator}.
@@ -119,33 +119,32 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
 	 * the iteration should terminate. A typical use case is to have an aggregator that sums up the total error of change
 	 * in an iteration step and have to have a convergence criterion that signals termination as soon as the aggregate value
 	 * is below a certain threshold.
-	 * 
+	 *
 	 * @param name The name under which the aggregator is registered.
 	 * @param aggregator The aggregator class.
 	 * @param convergenceCheck The convergence criterion.
-	 * 
+	 *
 	 * @return The IterativeDataSet itself, to allow chaining function calls.
 	 */
 	@PublicEvolving
 	public <X extends Value> IterativeDataSet<T> registerAggregationConvergenceCriterion(
-			String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck)
-	{
+			String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck) {
 		this.aggregators.registerAggregationConvergenceCriterion(name, aggregator, convergenceCheck);
 		return this;
 	}
-	
+
 	/**
-	 * Gets the registry for aggregators. On the registry, one can add {@link Aggregator}s and an aggregator-based 
+	 * Gets the registry for aggregators. On the registry, one can add {@link Aggregator}s and an aggregator-based
 	 * {@link ConvergenceCriterion}. This method offers an alternative way to registering the aggregators via
-	 * {@link #registerAggregator(String, Aggregator)} and {@link #registerAggregationConvergenceCriterion(String, Aggregator, ConvergenceCriterion))}.
-	 * 
+	 * {@link #registerAggregator(String, Aggregator)} and {@link #registerAggregationConvergenceCriterion(String, Aggregator, ConvergenceCriterion)}.
+	 *
 	 * @return The registry for aggregators.
 	 */
 	@PublicEvolving
 	public AggregatorRegistry getAggregators() {
 		return aggregators;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override