You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/25 11:31:21 UTC

[5/8] flink git commit: [FLINK-7190] [java] Activate checkstyle flink-java/*

[FLINK-7190] [java] Activate checkstyle flink-java/*

This closes #4343.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8624c290
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8624c290
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8624c290

Branch: refs/heads/master
Commit: 8624c2904f2840b901e805f1e0b33b6977e581ca
Parents: 87a1984
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:37:49 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 25 11:27:47 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/ClosureCleaner.java   |  44 +-
 .../flink/api/java/CollectionEnvironment.java   |   5 +-
 .../java/org/apache/flink/api/java/DataSet.java | 879 ++++++++++---------
 .../flink/api/java/ExecutionEnvironment.java    | 308 ++++---
 .../api/java/ExecutionEnvironmentFactory.java   |   4 +-
 .../apache/flink/api/java/LocalEnvironment.java |  34 +-
 .../flink/api/java/RemoteEnvironment.java       |  42 +-
 .../java/org/apache/flink/api/java/Utils.java   |  30 +-
 .../api/common/io/SequentialFormatTestBase.java |  36 +-
 .../api/common/io/SerializedFormatTest.java     |   8 +-
 .../CollectionExecutionAccumulatorsTest.java    |  36 +-
 .../CollectionExecutionIterationTest.java       |  50 +-
 ...ctionExecutionWithBroadcastVariableTest.java |  66 +-
 .../base/CoGroupOperatorCollectionTest.java     |   5 +-
 .../operators/base/GroupReduceOperatorTest.java |  21 +-
 .../base/InnerJoinOperatorBaseTest.java         |  15 +-
 .../operators/base/ReduceOperatorTest.java      |  14 +-
 .../flink/api/java/MultipleInvokationsTest.java |  27 +-
 .../flink/api/java/TypeExtractionTest.java      |  15 +-
 .../flink/api/java/tuple/TupleGenerator.java    |  69 +-
 tools/maven/suppressions-java.xml               |   8 -
 21 files changed, 897 insertions(+), 819 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 2f22a75..dd4b5c5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -26,7 +26,6 @@ import org.objectweb.asm.ClassReader;
 import org.objectweb.asm.ClassVisitor;
 import org.objectweb.asm.MethodVisitor;
 import org.objectweb.asm.Opcodes;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,20 +40,20 @@ import java.lang.reflect.Field;
  */
 @Internal
 public class ClosureCleaner {
-	
-	private static Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);
-	
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);
+
 	/**
 	 * Tries to clean the closure of the given object, if the object is a non-static inner
 	 * class.
-	 * 
+	 *
 	 * @param func The object whose closure should be cleaned.
 	 * @param checkSerializable Flag to indicate whether serializability should be checked after
 	 *                          the closure cleaning attempt.
-	 * 
+	 *
 	 * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was
 	 *                                 not serializable after the closure cleaning.
-	 * 
+	 *
 	 * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not
 	 *                          be loaded, in order to process during teh closure cleaning.
 	 */
@@ -62,32 +61,31 @@ public class ClosureCleaner {
 		if (func == null) {
 			return;
 		}
-		
+
 		final Class<?> cls = func.getClass();
 
 		// First find the field name of the "this$0" field, this can
 		// be "this$x" depending on the nesting
 		boolean closureAccessed = false;
-		
+
 		for (Field f: cls.getDeclaredFields()) {
 			if (f.getName().startsWith("this$")) {
 				// found a closure referencing field - now try to clean
 				closureAccessed |= cleanThis0(func, cls, f.getName());
 			}
 		}
-		
+
 		if (checkSerializable) {
 			try {
 				InstantiationUtil.serializeObject(func);
 			}
 			catch (Exception e) {
 				String functionType = getSuperClassOrInterfaceName(func.getClass());
-				
+
 				String msg = functionType == null ?
 						(func + " is not serializable.") :
 						("The implementation of the " + functionType + " is not serializable.");
-				
-				
+
 				if (closureAccessed) {
 					msg += " The implementation accesses fields of its enclosing class, which is " +
 							"a common reason for non-serializability. " +
@@ -96,7 +94,7 @@ public class ClosureCleaner {
 				} else {
 					msg += " The object probably contains or references non serializable fields.";
 				}
-				
+
 				throw new InvalidProgramException(msg, e);
 			}
 		}
@@ -109,14 +107,14 @@ public class ClosureCleaner {
 			throw new InvalidProgramException("Object " + obj + " is not serializable", e);
 		}
 	}
-	
+
 	private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
-		
+
 		This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
 		getClassReader(cls).accept(this0Finder, 0);
-		
+
 		final boolean accessesClosure = this0Finder.isThis0Accessed();
-				
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(this0Name + " is accessed: " + accessesClosure);
 		}
@@ -129,7 +127,7 @@ public class ClosureCleaner {
 				// has no this$0, just return
 				throw new RuntimeException("Could not set " + this0Name + ": " + e);
 			}
-			
+
 			try {
 				this0.setAccessible(true);
 				this0.set(func, null);
@@ -139,10 +137,10 @@ public class ClosureCleaner {
 				throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
 			}
 		}
-		
+
 		return accessesClosure;
 	}
-	
+
 	private static ClassReader getClassReader(Class<?> cls) {
 		String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
 		try {
@@ -151,8 +149,7 @@ public class ClosureCleaner {
 			throw new RuntimeException("Could not create ClassReader: " + e.getMessage(), e);
 		}
 	}
-	
-	
+
 	private static String getSuperClassOrInterfaceName(Class<?> cls) {
 		Class<?> superclass = cls.getSuperclass();
 		if (superclass.getName().startsWith("org.apache.flink")) {
@@ -176,7 +173,6 @@ class This0AccessFinder extends ClassVisitor {
 
 	private final String this0Name;
 	private boolean isThis0Accessed;
-	
 
 	public This0AccessFinder(String this0Name) {
 		super(Opcodes.ASM5);

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index 0d66286..bcfaac0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.CollectionExecutor;
 
+/**
+ * Version of {@link ExecutionEnvironment} that allows serial, local, collection-based executions of Flink programs.
+ */
 @PublicEvolving
 public class CollectionEnvironment extends ExecutionEnvironment {
 
@@ -40,7 +43,7 @@ public class CollectionEnvironment extends ExecutionEnvironment {
 	public int getParallelism() {
 		return 1; // always serial
 	}
-	
+
 	@Override
 	public String getExecutionPlan() throws Exception {
 		throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e3b2ec2..3dd4f6a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
@@ -64,7 +65,6 @@ import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.PartitionOperator;
@@ -93,8 +93,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * A DataSet represents a collection of elements of the same type.<br>
- * A DataSet can be transformed into another DataSet by applying a transformation as for example 
+ * A DataSet represents a collection of elements of the same type.
+ *
+ * <p>A DataSet can be transformed into another DataSet by applying a transformation as for example
  * <ul>
  *   <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li>
  *   <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
@@ -106,15 +107,14 @@ import java.util.List;
  */
 @Public
 public abstract class DataSet<T> {
-	
+
 	protected final ExecutionEnvironment context;
-	
+
 	// NOTE: the type must not be accessed directly, but only via getType()
 	private TypeInformation<T> type;
-	
+
 	private boolean typeUsed = false;
-	
-	
+
 	protected DataSet(ExecutionEnvironment context, TypeInformation<T> typeInfo) {
 		if (context == null) {
 			throw new NullPointerException("context is null");
@@ -129,27 +129,27 @@ public abstract class DataSet<T> {
 
 	/**
 	 * Returns the {@link ExecutionEnvironment} in which this DataSet is registered.
-	 * 
+	 *
 	 * @return The ExecutionEnvironment in which this DataSet is registered.
-	 * 
+	 *
 	 * @see ExecutionEnvironment
 	 */
 	public ExecutionEnvironment getExecutionEnvironment() {
 		return this.context;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Type Information handling
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Tries to fill in the type information. Type information can be filled in later when the program uses
 	 * a type hint. This method checks whether the type information has ever been accessed before and does not
 	 * allow modifications if the type was accessed already. This ensures consistency by making sure different
 	 * parts of the operation do not assume different type information.
-	 *   
+	 *
 	 * @param typeInfo The type information to fill in.
-	 * 
+	 *
 	 * @throws IllegalStateException Thrown, if the type information has been accessed before.
 	 */
 	protected void fillInType(TypeInformation<T> typeInfo) {
@@ -160,12 +160,12 @@ public abstract class DataSet<T> {
 		}
 		this.type = typeInfo;
 	}
-	
+
 	/**
 	 * Returns the {@link TypeInformation} for the type of this DataSet.
-	 * 
+	 *
 	 * @return The TypeInformation for the type of this DataSet.
-	 * 
+	 *
 	 * @see TypeInformation
 	 */
 	public TypeInformation<T> getType() {
@@ -193,15 +193,16 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	//  Filter & Transformations
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Applies a Map transformation on this DataSet.<br>
-	 * The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet.
+	 * Applies a Map transformation on this DataSet.
+	 *
+	 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet.
 	 * Each MapFunction call returns exactly one element.
-	 * 
+	 *
 	 * @param mapper The MapFunction that is called for each element of the DataSet.
 	 * @return A MapOperator that represents the transformed DataSet.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.MapFunction
 	 * @see org.apache.flink.api.common.functions.RichMapFunction
 	 * @see MapOperator
@@ -216,8 +217,6 @@ public abstract class DataSet<T> {
 		return new MapOperator<>(this, resultType, clean(mapper), callLocation);
 	}
 
-
-
 	/**
 	 * Applies a Map-style operation to the entire partition of the data.
 	 * The function is called once per parallel partition of the data,
@@ -225,7 +224,7 @@ public abstract class DataSet<T> {
 	 * The number of elements that each instance of the MapPartition function
 	 * sees is non deterministic and depends on the parallelism of the operation.
 	 *
-	 * This function is intended for operations that cannot transform individual elements,
+	 * <p>This function is intended for operations that cannot transform individual elements,
 	 * requires no grouping of elements. To transform individual elements,
 	 * the use of {@code map()} and {@code flatMap()} is preferable.
 	 *
@@ -235,24 +234,25 @@ public abstract class DataSet<T> {
 	 * @see MapPartitionFunction
 	 * @see MapPartitionOperator
 	 */
-	public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition ){
+	public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition) {
 		if (mapPartition == null) {
 			throw new NullPointerException("MapPartition function must not be null.");
 		}
-		
+
 		String callLocation = Utils.getCallLocationName();
 		TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, getType(), callLocation, true);
 		return new MapPartitionOperator<>(this, resultType, clean(mapPartition), callLocation);
 	}
-	
+
 	/**
-	 * Applies a FlatMap transformation on a {@link DataSet}.<br>
-	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
+	 * Applies a FlatMap transformation on a {@link DataSet}.
+	 *
+	 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
 	 * Each FlatMapFunction call can return any number of elements including none.
-	 * 
-	 * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. 
+	 *
+	 * @param flatMapper The FlatMapFunction that is called for each element of the DataSet.
 	 * @return A FlatMapOperator that represents the transformed DataSet.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.RichFlatMapFunction
 	 * @see FlatMapOperator
 	 * @see DataSet
@@ -266,16 +266,17 @@ public abstract class DataSet<T> {
 		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
 		return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
 	}
-	
+
 	/**
-	 * Applies a Filter transformation on a {@link DataSet}.<br>
-	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichFilterFunction} for each element of the DataSet
-	 * and retains only those element for which the function returns true. Elements for 
-	 * which the function returns false are filtered. 
-	 * 
+	 * Applies a Filter transformation on a {@link DataSet}.
+	 *
+	 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichFilterFunction} for each element of the DataSet
+	 * and retains only those element for which the function returns true. Elements for
+	 * which the function returns false are filtered.
+	 *
 	 * @param filter The FilterFunction that is called for each element of the DataSet.
 	 * @return A FilterOperator that represents the filtered DataSet.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.RichFilterFunction
 	 * @see FilterOperator
 	 * @see DataSet
@@ -287,16 +288,18 @@ public abstract class DataSet<T> {
 		return new FilterOperator<>(this, clean(filter), Utils.getCallLocationName());
 	}
 
-	
 	// --------------------------------------------------------------------------------------------
 	//  Projections
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Applies a Project transformation on a {@link Tuple} {@link DataSet}.<br>
-	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b><br>
-	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.<br>
-	 * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+	 * Applies a Project transformation on a {@link Tuple} {@link DataSet}.
+	 *
+	 * <p><b>Note: Only Tuple DataSets can be projected using field indexes.</b>
+	 *
+	 * <p>The transformation projects each Tuple of the DataSet onto a (sub)set of fields.
+	 *
+	 * <p>Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
 	 *
 	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
 	 *
@@ -311,22 +314,23 @@ public abstract class DataSet<T> {
 	public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
 		return new Projection<>(this, fieldIndexes).projectTupleX();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Non-grouped aggregations
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Applies an Aggregate transformation on a non-grouped {@link Tuple} {@link DataSet}.<br>
-	 * <b>Note: Only Tuple DataSets can be aggregated.</b>
-	 * The transformation applies a built-in {@link Aggregations Aggregation} on a specified field 
-	 *   of a Tuple DataSet. Additional aggregation functions can be added to the resulting 
+	 * Applies an Aggregate transformation on a non-grouped {@link Tuple} {@link DataSet}.
+	 *
+	 * <p><b>Note: Only Tuple DataSets can be aggregated.</b>
+	 * The transformation applies a built-in {@link Aggregations Aggregation} on a specified field
+	 *   of a Tuple DataSet. Additional aggregation functions can be added to the resulting
 	 *   {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}.
-	 * 
+	 *
 	 * @param agg The built-in aggregation function that is computed.
 	 * @param field The index of the Tuple field on which the aggregation function is applied.
-	 * @return An AggregateOperator that represents the aggregated DataSet. 
-	 * 
+	 * @return An AggregateOperator that represents the aggregated DataSet.
+	 *
 	 * @see Tuple
 	 * @see Aggregations
 	 * @see AggregateOperator
@@ -337,7 +341,7 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Syntactic sugar for aggregate (SUM, field)
+	 * Syntactic sugar for aggregate (SUM, field).
 	 * @param field The index of the Tuple field on which the aggregation function is applied.
 	 * @return An AggregateOperator that represents the summed DataSet.
 	 *
@@ -350,8 +354,8 @@ public abstract class DataSet<T> {
 	/**
 	 * Syntactic sugar for {@link #aggregate(Aggregations, int)} using {@link Aggregations#MAX} as
 	 * the aggregation function.
-	 * <p>
-	 * <strong>Note:</strong> This operation is not to be confused with {@link #maxBy(int...)},
+	 *
+	 * <p><strong>Note:</strong> This operation is not to be confused with {@link #maxBy(int...)},
 	 * which selects one element with maximum value at the specified field positions.
 	 *
 	 * @param field The index of the Tuple field on which the aggregation function is applied.
@@ -367,8 +371,8 @@ public abstract class DataSet<T> {
 	/**
 	 * Syntactic sugar for {@link #aggregate(Aggregations, int)} using {@link Aggregations#MIN} as
 	 * the aggregation function.
-	 * <p>
-	 * <strong>Note:</strong> This operation is not to be confused with {@link #minBy(int...)},
+	 *
+	 * <p><strong>Note:</strong> This operation is not to be confused with {@link #minBy(int...)},
 	 * which selects one element with the minimum value at the specified field positions.
 	 *
 	 * @param field The index of the Tuple field on which the aggregation function is applied.
@@ -395,7 +399,6 @@ public abstract class DataSet<T> {
 		return res.<Long> getAccumulatorResult(id);
 	}
 
-
 	/**
 	 * Convenience method to get the elements of a DataSet as a List.
 	 * As DataSet can contain a lot of data, this method should be used with caution.
@@ -405,7 +408,7 @@ public abstract class DataSet<T> {
 	public List<T> collect() throws Exception {
 		final String id = new AbstractID().toString();
 		final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
-		
+
 		this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
 		JobExecutionResult res = getExecutionEnvironment().execute();
 
@@ -424,14 +427,15 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Applies a Reduce transformation on a non-grouped {@link DataSet}.<br>
-	 * The transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction}
+	 * Applies a Reduce transformation on a non-grouped {@link DataSet}.
+	 *
+	 * <p>The transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction}
 	 *   until only a single element remains which is the result of the transformation.
 	 * A ReduceFunction combines two elements into one new element of the same type.
-	 * 
+	 *
 	 * @param reducer The ReduceFunction that is applied on the DataSet.
 	 * @return A ReduceOperator that represents the reduced DataSet.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.RichReduceFunction
 	 * @see ReduceOperator
 	 * @see DataSet
@@ -442,16 +446,17 @@ public abstract class DataSet<T> {
 		}
 		return new ReduceOperator<>(this, clean(reducer), Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.<br>
-	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet.
+	 * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.
+	 *
+	 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet.
 	 * The GroupReduceFunction can iterate over all elements of the DataSet and emit any
 	 *   number of output elements including none.
-	 * 
+	 *
 	 * @param reducer The GroupReduceFunction that is applied on the DataSet.
 	 * @return A GroupReduceOperator that represents the reduced DataSet.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.RichGroupReduceFunction
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
 	 * @see DataSet
@@ -460,7 +465,7 @@ public abstract class DataSet<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		
+
 		String callLocation = Utils.getCallLocationName();
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true);
 		return new GroupReduceOperator<>(this, resultType, clean(reducer), callLocation);
@@ -490,99 +495,100 @@ public abstract class DataSet<T> {
 
 	/**
 	 * Selects an element with minimum value.
-	 * <p>
-	 * The minimum is computed over the specified fields in lexicographical order.
-	 * <p>
-	 * <strong>Example 1</strong>: Given a data set with elements <code>[0, 1], [1, 0]</code>, the
+	 *
+	 * <p>The minimum is computed over the specified fields in lexicographical order.
+	 *
+	 * <p><strong>Example 1</strong>: Given a data set with elements <code>[0, 1], [1, 0]</code>, the
 	 * results will be:
 	 * <ul>
 	 * <li><code>minBy(0)</code>: <code>[0, 1]</code></li>
 	 * <li><code>minBy(1)</code>: <code>[1, 0]</code></li>
 	 * </ul>
-	 * <p>
-	 * <strong>Example 2</strong>: Given a data set with elements <code>[0, 0], [0, 1]</code>, the
+	 *
+	 * <p><strong>Example 2</strong>: Given a data set with elements <code>[0, 0], [0, 1]</code>, the
 	 * results will be:
 	 * <ul>
 	 * <li><code>minBy(0, 1)</code>: <code>[0, 0]</code></li>
 	 * </ul>
-	 * <p>
-	 * If multiple values with minimum value at the specified fields exist, a random one will be
+	 *
+	 * <p>If multiple values with minimum value at the specified fields exist, a random one will be
 	 * picked.
-	 * <p>
-	 * Internally, this operation is implemented as a {@link ReduceFunction}.
+	 *
+	 * <p>Internally, this operation is implemented as a {@link ReduceFunction}.
 	 *
 	 * @param fields Field positions to compute the minimum over
 	 * @return A {@link ReduceOperator} representing the minimum
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public ReduceOperator<T> minBy(int... fields)  {
-		if(!getType().isTupleType()) {
+		if (!getType().isTupleType()) {
 			throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types.");
 		}
 
 		return new ReduceOperator<>(this, new SelectByMinFunction(
-				(TupleTypeInfo) getType(), fields), Utils.getCallLocationName());
+			(TupleTypeInfo) getType(), fields), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Selects an element with maximum value.
-	 * <p>
-	 * The maximum is computed over the specified fields in lexicographical order.
-	 * <p>
-	 * <strong>Example 1</strong>: Given a data set with elements <code>[0, 1], [1, 0]</code>, the
+	 *
+	 * <p>The maximum is computed over the specified fields in lexicographical order.
+	 *
+	 * <p><strong>Example 1</strong>: Given a data set with elements <code>[0, 1], [1, 0]</code>, the
 	 * results will be:
 	 * <ul>
 	 * <li><code>maxBy(0)</code>: <code>[1, 0]</code></li>
 	 * <li><code>maxBy(1)</code>: <code>[0, 1]</code></li>
 	 * </ul>
-	 * <p>
-	 * <strong>Example 2</strong>: Given a data set with elements <code>[0, 0], [0, 1]</code>, the
+	 *
+	 * <p><strong>Example 2</strong>: Given a data set with elements <code>[0, 0], [0, 1]</code>, the
 	 * results will be:
 	 * <ul>
 	 * <li><code>maxBy(0, 1)</code>: <code>[0, 1]</code></li>
 	 * </ul>
-	 * <p>
-	 * If multiple values with maximum value at the specified fields exist, a random one will be
+	 *
+	 * <p>If multiple values with maximum value at the specified fields exist, a random one will be
 	 * picked.
-	 * <p>
-	 * Internally, this operation is implemented as a {@link ReduceFunction}.
+	 *
+	 * <p>Internally, this operation is implemented as a {@link ReduceFunction}.
 	 *
 	 * @param fields Field positions to compute the maximum over
 	 * @return A {@link ReduceOperator} representing the maximum
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public ReduceOperator<T> maxBy(int... fields)  {
-		if(!getType().isTupleType()) {
+	public ReduceOperator<T> maxBy(int... fields) {
+		if (!getType().isTupleType()) {
 			throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types.");
 		}
 
 		return new ReduceOperator<>(this, new SelectByMaxFunction(
-				(TupleTypeInfo) getType(), fields), Utils.getCallLocationName());
+			(TupleTypeInfo) getType(), fields), Utils.getCallLocationName());
 	}
 
 	/**
-	 * Returns a new set containing the first n elements in this {@link DataSet}.<br>
+	 * Returns a new set containing the first n elements in this {@link DataSet}.
+	 *
 	 * @param n The desired number of elements.
 	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
-	*/
+	 */
 	public GroupReduceOperator<T, T> first(int n) {
-		if(n < 1) {
+		if (n < 1) {
 			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
 		}
-		
+
 		return reduceGroup(new FirstReducer<T>(n));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  distinct
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns a distinct set of a {@link DataSet} using a {@link KeySelector} function.
-	 * <p>
-	 * The KeySelector function is called for each element of the DataSet and extracts a single key value on which the
+	 *
+	 * <p>The KeySelector function is called for each element of the DataSet and extracts a single key value on which the
 	 * decision is made if two items are distinct or not.
-	 *  
+	 *
 	 * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which the
 	 *                     distinction of the DataSet is decided.
 	 * @return A DistinctOperator that represents the distinct DataSet.
@@ -591,26 +597,26 @@ public abstract class DataSet<T> {
 		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
 		return new DistinctOperator<>(this, new Keys.SelectorFunctionKeys<>(keyExtractor, getType(), keyType), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys.
-	 * <p>
-	 * The field position keys specify the fields of Tuples on which the decision is made if two Tuples are distinct or
+	 *
+	 * <p>The field position keys specify the fields of Tuples on which the decision is made if two Tuples are distinct or
 	 * not.
-	 * <p>
-	 * Note: Field position keys can only be specified for Tuple DataSets.
 	 *
-	 * @param fields One or more field positions on which the distinction of the DataSet is decided. 
+	 * <p>Note: Field position keys can only be specified for Tuple DataSets.
+	 *
+	 * @param fields One or more field positions on which the distinction of the DataSet is decided.
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct(int... fields) {
 		return new DistinctOperator<>(this, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Returns a distinct set of a {@link DataSet} using expression keys.
-	 * <p>
-	 * The field expression keys specify the fields of a {@link org.apache.flink.api.common.typeutils.CompositeType}
+	 *
+	 * <p>The field expression keys specify the fields of a {@link org.apache.flink.api.common.typeutils.CompositeType}
 	 * (e.g., Tuple or Pojo type) on which the decision is made if two elements are distinct or not.
 	 * In case of a {@link org.apache.flink.api.common.typeinfo.AtomicType}, only the wildcard expression ("*") is valid.
 	 *
@@ -620,39 +626,40 @@ public abstract class DataSet<T> {
 	public DistinctOperator<T> distinct(String... fields) {
 		return new DistinctOperator<>(this, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Returns a distinct set of a {@link DataSet}.
-	 * <p>
-	 * If the input is a {@link org.apache.flink.api.common.typeutils.CompositeType} (Tuple or Pojo type),
+	 *
+	 * <p>If the input is a {@link org.apache.flink.api.common.typeutils.CompositeType} (Tuple or Pojo type),
 	 * distinct is performed on all fields and each field must be a key type
-	 * 
+	 *
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct() {
 		return new DistinctOperator<>(this, null, Utils.getCallLocationName());
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Grouping
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Groups a {@link DataSet} using a {@link KeySelector} function. 
-	 * The KeySelector function is called for each element of the DataSet and extracts a single 
-	 *   key value on which the DataSet is grouped. <br>
-	 * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation 
-	 *   can be applied. 
+	 * Groups a {@link DataSet} using a {@link KeySelector} function.
+	 * The KeySelector function is called for each element of the DataSet and extracts a single
+	 *   key value on which the DataSet is grouped.
+	 *
+	 * <p>This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
+	 *   can be applied.
 	 * <ul>
-	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
+	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
 	 *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
 	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
 	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
 	 * </ul>
-	 *  
-	 * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. 
+	 *
+	 * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped.
 	 * @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet.
-	 * 
+	 *
 	 * @see KeySelector
 	 * @see UnsortedGrouping
 	 * @see AggregateOperator
@@ -664,23 +671,25 @@ public abstract class DataSet<T> {
 		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
 		return new UnsortedGrouping<>(this, new Keys.SelectorFunctionKeys<>(clean(keyExtractor), getType(), keyType));
 	}
-	
+
 	/**
-	 * Groups a {@link Tuple} {@link DataSet} using field position keys.<br> 
-	 * <b>Note: Field position keys only be specified for Tuple DataSets.</b><br>
-	 * The field position keys specify the fields of Tuples on which the DataSet is grouped.
-	 * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation 
-	 *   can be applied. 
+	 * Groups a {@link Tuple} {@link DataSet} using field position keys.
+	 *
+	 * <p><b>Note: Field position keys only be specified for Tuple DataSets.</b>
+	 *
+	 * <p>The field position keys specify the fields of Tuples on which the DataSet is grouped.
+	 * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
+	 *   can be applied.
 	 * <ul>
-	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
+	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
 	 *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
 	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
 	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
-	 * </ul> 
-	 * 
-	 * @param fields One or more field positions on which the DataSet will be grouped. 
+	 * </ul>
+	 *
+	 * @param fields One or more field positions on which the DataSet will be grouped.
 	 * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
-	 * 
+	 *
 	 * @see Tuple
 	 * @see UnsortedGrouping
 	 * @see AggregateOperator
@@ -718,44 +727,46 @@ public abstract class DataSet<T> {
 	public UnsortedGrouping<T> groupBy(String... fields) {
 		return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Joining
 	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Initiates a Join transformation. <br>
-	 * A Join transformation joins the elements of two 
-	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.<br>
-	 * 
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+
+	/**
+	 * Initiates a Join transformation.
+	 *
+	 * <p>A Join transformation joins the elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
-	 *  
+	 *
 	 * @param other The other DataSet with which this DataSet is joined.
 	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
-	 * 
+	 *
 	 * @see JoinOperatorSets
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
 		return new JoinOperatorSets<>(this, other);
 	}
-	
+
 	/**
-	 * Initiates a Join transformation. <br>
-	 * A Join transformation joins the elements of two 
-	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.<br>
-	 * 
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 * Initiates a Join transformation.
+	 *
+	 * <p>A Join transformation joins the elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
-	 *  
+	 *
 	 * @param other The other DataSet with which this DataSet is joined.
 	 * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
 	 *                 optimizer will pick the join strategy.
 	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
-	 * 
+	 *
 	 * @see JoinOperatorSets
 	 * @see DataSet
 	 */
@@ -764,39 +775,45 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Initiates a Join transformation. <br>
-	 * A Join transformation joins the elements of two 
-	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.<br>
-	 * This method also gives the hint to the optimizer that the second DataSet to join is much
-	 *   smaller than the first one.<br>
-	 * This method returns a {@link JoinOperatorSets} on which
+	 * Initiates a Join transformation.
+	 *
+	 * <p>A Join transformation joins the elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>This method also gives the hint to the optimizer that the second DataSet to join is much
+	 *   smaller than the first one.
+	 *
+	 * <p>This method returns a {@link JoinOperatorSets} on which
 	 *   {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first
 	 *   joining (i.e., this) DataSet.
-	 *  
+	 *
 	 * @param other The other DataSet with which this DataSet is joined.
 	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
-	 * 
+	 *
 	 * @see JoinOperatorSets
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
 		return new JoinOperatorSets<>(this, other, JoinHint.BROADCAST_HASH_SECOND);
 	}
-	
+
 	/**
-	 * Initiates a Join transformation.<br>
-	 * A Join transformation joins the elements of two 
-	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.<br>
-	 * This method also gives the hint to the optimizer that the second DataSet to join is much
-	 *   larger than the first one.<br>
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 * Initiates a Join transformation.
+	 *
+	 * <p>A Join transformation joins the elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>This method also gives the hint to the optimizer that the second DataSet to join is much
+	 *   larger than the first one.
+	 *
+	 * <p>This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
-	 *  
+	 *
 	 * @param other The other DataSet with which this DataSet is joined.
 	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
-	 * 
+	 *
 	 * @see JoinOperatorSets
 	 * @see DataSet
 	 */
@@ -805,11 +822,13 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Initiates a Left Outer Join transformation.<br>
-	 * An Outer Join transformation joins two elements of two
+	 * Initiates a Left Outer Join transformation.
+	 *
+	 * <p>An Outer Join transformation joins two elements of two
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
-	 *   joining elements into one DataSet.<br>
-	 * Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
 	 *   element on the other side are joined with {@code null} and emitted to the
 	 *   resulting DataSet.
 	 *
@@ -824,11 +843,13 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Initiates a Left Outer Join transformation.<br>
-	 * An Outer Join transformation joins two elements of two
+	 * Initiates a Left Outer Join transformation.
+	 *
+	 * <p>An Outer Join transformation joins two elements of two
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
-	 *   joining elements into one DataSet.<br>
-	 * Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
 	 *   element on the other side are joined with {@code null} and emitted to the
 	 *   resulting DataSet.
 	 *
@@ -849,17 +870,19 @@ public abstract class DataSet<T> {
 			case BROADCAST_HASH_SECOND:
 				return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
 			default:
-				throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: "+strategy);
+				throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: " + strategy);
 		}
 
 	}
 
 	/**
-	 * Initiates a Right Outer Join transformation.<br>
-	 * An Outer Join transformation joins two elements of two
+	 * Initiates a Right Outer Join transformation.
+	 *
+	 * <p>An Outer Join transformation joins two elements of two
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
-	 *   joining elements into one DataSet.<br>
-	 * Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
 	 *   element on {@code this} side are joined with {@code null} and emitted to the
 	 *   resulting DataSet.
 	 *
@@ -874,11 +897,13 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Initiates a Right Outer Join transformation.<br>
-	 * An Outer Join transformation joins two elements of two
+	 * Initiates a Right Outer Join transformation.
+	 *
+	 * <p>An Outer Join transformation joins two elements of two
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
-	 *   joining elements into one DataSet.<br>
-	 * Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
 	 *   element on {@code this} side are joined with {@code null} and emitted to the
 	 *   resulting DataSet.
 	 *
@@ -899,16 +924,18 @@ public abstract class DataSet<T> {
 			case BROADCAST_HASH_FIRST:
 				return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
 			default:
-			throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: "+strategy);
+				throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: " + strategy);
 		}
 	}
 
 	/**
-	 * Initiates a Full Outer Join transformation.<br>
-	 * An Outer Join transformation joins two elements of two
+	 * Initiates a Full Outer Join transformation.
+	 *
+	 * <p>An Outer Join transformation joins two elements of two
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
-	 *   joining elements into one DataSet.<br>
-	 * Elements of <b>both</b> DataSets that do not have a matching
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>Elements of <b>both</b> DataSets that do not have a matching
 	 *   element on the opposing side are joined with {@code null} and emitted to the
 	 *   resulting DataSet.
 	 *
@@ -923,11 +950,13 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Initiates a Full Outer Join transformation.<br>
-	 * An Outer Join transformation joins two elements of two
+	 * Initiates a Full Outer Join transformation.
+	 *
+	 * <p>An Outer Join transformation joins two elements of two
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
-	 *   joining elements into one DataSet.<br>
-	 * Elements of <b>both</b> DataSets that do not have a matching
+	 *   joining elements into one DataSet.
+	 *
+	 * <p>Elements of <b>both</b> DataSets that do not have a matching
 	 *   element on the opposing side are joined with {@code null} and emitted to the
 	 *   resulting DataSet.
 	 *
@@ -947,30 +976,32 @@ public abstract class DataSet<T> {
 			case REPARTITION_HASH_SECOND:
 				return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
 			default:
-			throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
+				throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: " + strategy);
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Co-Grouping
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Initiates a CoGroup transformation.<br>
-	 * A CoGroup transformation combines the elements of
-	 *   two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and 
+	 * Initiates a CoGroup transformation.
+	 *
+	 * <p>A CoGroup transformation combines the elements of
+	 *   two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and
 	 *   gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.common.functions.RichCoGroupFunction}.
 	 *   If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction
-	 *   is called with an empty group for the non-existing group.<br>
-	 * The CoGroupFunction can iterate over the elements of both groups and return any number 
-	 *   of elements including none.<br>
-	 * This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods
+	 *   is called with an empty group for the non-existing group.
+	 *
+	 * <p>The CoGroupFunction can iterate over the elements of both groups and return any number
+	 *   of elements including none.
+	 *
+	 * <p>This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
-	 * 
+	 *
 	 * @param other The other DataSet of the CoGroup transformation.
 	 * @return A CoGroupOperatorSets to continue the definition of the CoGroup transformation.
-	 * 
+	 *
 	 * @see CoGroupOperatorSets
 	 * @see CoGroupOperator
 	 * @see DataSet
@@ -984,37 +1015,39 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Continues a Join transformation and defines the {@link Tuple} fields of the second join 
-	 * {@link DataSet} that should be used as join keys.<br>
-	 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br>
-	 * 
-	 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with 
-	 * the element of the first input being the first field of the tuple and the element of the 
-	 * second input being the second field of the tuple. 
-	 * 
+	 * Continues a Join transformation and defines the {@link Tuple} fields of the second join
+	 * {@link DataSet} that should be used as join keys.
+	 *
+	 * <p><b>Note: Fields can only be selected as join keys on Tuple DataSets.</b>
+	 *
+	 * <p>The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+	 * the element of the first input being the first field of the tuple and the element of the
+	 * second input being the second field of the tuple.
+	 *
 	 * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
 	 * @return A DefaultJoin that represents the joined DataSet.
 	 */
-	
+
 	/**
-	 * Initiates a Cross transformation.<br>
-	 * A Cross transformation combines the elements of two 
-	 *   {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of 
+	 * Initiates a Cross transformation.
+	 *
+	 * <p>A Cross transformation combines the elements of two
+	 *   {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of
 	 *   both DataSets, i.e., it builds a Cartesian product.
-	 * 
-	 * <p>
-	 * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
-	 * the element of the first input being the first field of the tuple and the element of the 
+	 *
+	 *
+	 * <p>The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
+	 * the element of the first input being the first field of the tuple and the element of the
 	 * second input being the second field of the tuple.
-	 * 
-	 * <p>
-	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 *
+	 *
+	 * <p>Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
-	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br>
-	 * 
-	 * @param other The other DataSet with which this DataSet is crossed. 
+	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
+	 *
+	 * @param other The other DataSet with which this DataSet is crossed.
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
 	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
@@ -1023,28 +1056,29 @@ public abstract class DataSet<T> {
 	public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other) {
 		return new CrossOperator.DefaultCross<>(this, other, CrossHint.OPTIMIZER_CHOOSES, Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Initiates a Cross transformation.<br>
-	 * A Cross transformation combines the elements of two 
-	 *   {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of 
+	 * Initiates a Cross transformation.
+	 *
+	 * <p>A Cross transformation combines the elements of two
+	 *   {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of
 	 *   both DataSets, i.e., it builds a Cartesian product.
 	 * This method also gives the hint to the optimizer that the second DataSet to cross is much
 	 *   smaller than the first one.
-	 *   
-	 * <p>
-	 * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
-	 * the element of the first input being the first field of the tuple and the element of the 
+	 *
+	 *
+	 * <p>The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
+	 * the element of the first input being the first field of the tuple and the element of the
 	 * second input being the second field of the tuple.
-	 *   
-	 * <p>
-	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 *
+	 *
+	 * <p>Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
-	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br>
-	 * 
-	 * @param other The other DataSet with which this DataSet is crossed. 
+	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
+	 *
+	 * @param other The other DataSet with which this DataSet is crossed.
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
 	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
@@ -1053,28 +1087,29 @@ public abstract class DataSet<T> {
 	public <R> CrossOperator.DefaultCross<T, R> crossWithTiny(DataSet<R> other) {
 		return new CrossOperator.DefaultCross<>(this, other, CrossHint.SECOND_IS_SMALL, Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Initiates a Cross transformation.<br>
-	 * A Cross transformation combines the elements of two 
-	 *   {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of 
+	 * Initiates a Cross transformation.
+	 *
+	 * <p>A Cross transformation combines the elements of two
+	 *   {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of
 	 *   both DataSets, i.e., it builds a Cartesian product.
 	 * This method also gives the hint to the optimizer that the second DataSet to cross is much
 	 *   larger than the first one.
-	 *   
-	 * <p>
-	 * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
-	 * the element of the first input being the first field of the tuple and the element of the 
+	 *
+	 *
+	 * <p>The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
+	 * the element of the first input being the first field of the tuple and the element of the
 	 * second input being the second field of the tuple.
-	 *   
-	 * <p>
-	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 *
+	 *
+	 * <p>Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
-	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br>
-	 * 
-	 * @param other The other DataSet with which this DataSet is crossed. 
+	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
+	 *
+	 * @param other The other DataSet with which this DataSet is crossed.
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
 	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
@@ -1094,13 +1129,13 @@ public abstract class DataSet<T> {
 	 * given to the {@code closeWith(DataSet)} method is the data set that will be fed back and used as the input
 	 * to the next iteration. The return value of the {@code closeWith(DataSet)} method is the resulting
 	 * data set after the iteration has terminated.
-	 * <p>
-	 * An example of an iterative computation is as follows:
+	 *
+	 * <p>An example of an iterative computation is as follows:
 	 *
 	 * <pre>
 	 * {@code
 	 * DataSet<Double> input = ...;
-	 * 
+	 *
 	 * DataSet<Double> startOfIteration = input.iterate(10);
 	 * DataSet<Double> toBeFedBack = startOfIteration
 	 *                               .map(new MyMapper())
@@ -1108,20 +1143,20 @@ public abstract class DataSet<T> {
 	 * DataSet<Double> result = startOfIteration.closeWith(toBeFedBack);
 	 * }
 	 * </pre>
-	 * <p>
-	 * The iteration has a maximum number of times that it executes. A dynamic termination can be realized by using a
+	 *
+	 * <p>The iteration has a maximum number of times that it executes. A dynamic termination can be realized by using a
 	 * termination criterion (see {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet, DataSet)}).
-	 * 
+	 *
 	 * @param maxIterations The maximum number of times that the iteration is executed.
 	 * @return An IterativeDataSet that marks the start of the iterative part and needs to be closed by
 	 *         {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet)}.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.operators.IterativeDataSet
 	 */
 	public IterativeDataSet<T> iterate(int maxIterations) {
 		return new IterativeDataSet<>(getExecutionEnvironment(), getType(), this, maxIterations);
 	}
-	
+
 	/**
 	 * Initiates a delta iteration. A delta iteration is similar to a regular iteration (as started by {@link #iterate(int)},
 	 * but maintains state across the individual iteration steps. The Solution set, which represents the current state
@@ -1130,48 +1165,48 @@ public abstract class DataSet<T> {
 	 * can be obtained via {@link org.apache.flink.api.java.operators.DeltaIteration#getWorkset()}.
 	 * The solution set is updated by producing a delta for it, which is merged into the solution set at the end of each
 	 * iteration step.
-	 * <p>
-	 * The delta iteration must be closed by calling {@link org.apache.flink.api.java.operators.DeltaIteration#closeWith(DataSet, DataSet)}. The two
+	 *
+	 * <p>The delta iteration must be closed by calling {@link org.apache.flink.api.java.operators.DeltaIteration#closeWith(DataSet, DataSet)}. The two
 	 * parameters are the delta for the solution set and the new workset (the data set that will be fed back).
 	 * The return value of the {@code closeWith(DataSet, DataSet)} method is the resulting
 	 * data set after the iteration has terminated. Delta iterations terminate when the feed back data set
 	 * (the workset) is empty. In addition, a maximum number of steps is given as a fall back termination guard.
-	 * <p>
-	 * Elements in the solution set are uniquely identified by a key. When merging the solution set delta, contained elements
+	 *
+	 * <p>Elements in the solution set are uniquely identified by a key. When merging the solution set delta, contained elements
 	 * with the same key are replaced.
-	 * <p>
-	 * <b>NOTE:</b> Delta iterations currently support only tuple valued data types. This restriction
+	 *
+	 * <p><b>NOTE:</b> Delta iterations currently support only tuple valued data types. This restriction
 	 * will be removed in the future. The key is specified by the tuple position.
-	 * <p>
-	 * A code example for a delta iteration is as follows
+	 *
+	 * <p>A code example for a delta iteration is as follows
 	 * <pre>
 	 * {@code
 	 * DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
 	 *                                                  initialState.iterateDelta(initialFeedbackSet, 100, 0);
-	 * 
+	 *
 	 * DataSet<Tuple2<Long, Long>> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
 	 *                                              .join(iteration.getSolutionSet()).where(0).equalTo(0)
 	 *                                              .flatMap(new ProjectAndFilter());
-	 *                                              
+	 *
 	 * DataSet<Tuple2<Long, Long>> feedBack = delta.join(someOtherSet).where(...).equalTo(...).with(...);
-	 * 
+	 *
 	 * // close the delta iteration (delta and new workset are identical)
 	 * DataSet<Tuple2<Long, Long>> result = iteration.closeWith(delta, feedBack);
 	 * }
 	 * </pre>
-	 * 
+	 *
 	 * @param workset The initial version of the data set that is fed back to the next iteration step (the workset).
 	 * @param maxIterations The maximum number of iteration steps, as a fall back safeguard.
 	 * @param keyPositions The position of the tuple fields that is used as the key of the solution set.
-	 * 
+	 *
 	 * @return The DeltaIteration that marks the start of a delta iteration.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.operators.DeltaIteration
 	 */
 	public <R> DeltaIteration<T, R> iterateDelta(DataSet<R> workset, int maxIterations, int... keyPositions) {
 		Preconditions.checkNotNull(workset);
 		Preconditions.checkNotNull(keyPositions);
-		
+
 		Keys.ExpressionKeys<T> keys = new Keys.ExpressionKeys<>(keyPositions, getType());
 		return new DeltaIteration<>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations);
 	}
@@ -1179,12 +1214,11 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	//  Custom Operators
 	// -------------------------------------------------------------------------------------------
-	
 
 	/**
 	 * Runs a {@link CustomUnaryOperation} on the data set. Custom operations are typically complex
 	 * operators that are composed of multiple steps.
-	 * 
+	 *
 	 * @param operation The operation to run.
 	 * @return The data set produced by the operation.
 	 */
@@ -1193,14 +1227,14 @@ public abstract class DataSet<T> {
 		operation.setInput(this);
 		return operation.createResult();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Union
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Creates a union of this DataSet with an other DataSet. The other DataSet must be of the same data type.
-	 * 
+	 *
 	 * @param other The other DataSet which is unioned with the current DataSet.
 	 * @return The resulting DataSet.
 	 */
@@ -1211,39 +1245,39 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	//  Partitioning
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Hash-partitions a DataSet on the specified key fields.
-	 * <p>
-	 * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
-	 * 
+	 *
+	 * <p><b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
+	 *
 	 * @param fields The field indexes on which the DataSet is hash-partitioned.
 	 * @return The partitioned DataSet.
 	 */
 	public PartitionOperator<T> partitionByHash(int... fields) {
 		return new PartitionOperator<>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Hash-partitions a DataSet on the specified key fields.
-	 * <p>
-	 * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
-	 * 
+	 *
+	 * <p><b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
+	 *
 	 * @param fields The field expressions on which the DataSet is hash-partitioned.
 	 * @return The partitioned DataSet.
 	 */
 	public PartitionOperator<T> partitionByHash(String... fields) {
 		return new PartitionOperator<>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Partitions a DataSet using the specified KeySelector.
-	 * <p>
-	 * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
-	 * 
+	 *
+	 * <p><b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
+	 *
 	 * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned.
 	 * @return The partitioned DataSet.
-	 * 
+	 *
 	 * @see KeySelector
 	 */
 	public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
@@ -1253,8 +1287,8 @@ public abstract class DataSet<T> {
 
 	/**
 	 * Range-partitions a DataSet on the specified key fields.
-	 * <p>
-	 * <b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
+	 *
+	 * <p><b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
 	 * shuffles the whole DataSet over the network. This can take significant amount of time.
 	 *
 	 * @param fields The field indexes on which the DataSet is range-partitioned.
@@ -1266,8 +1300,8 @@ public abstract class DataSet<T> {
 
 	/**
 	 * Range-partitions a DataSet on the specified key fields.
-	 * <p>
-	 * <b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
+	 *
+	 * <p><b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
 	 * shuffles the whole DataSet over the network. This can take significant amount of time.
 	 *
 	 * @param fields The field expressions on which the DataSet is range-partitioned.
@@ -1279,8 +1313,8 @@ public abstract class DataSet<T> {
 
 	/**
 	 * Range-partitions a DataSet using the specified KeySelector.
-	 * <p>
-	 * <b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
+	 *
+	 * <p><b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
 	 * shuffles the whole DataSet over the network. This can take significant amount of time.
 	 *
 	 * @param keyExtractor The KeyExtractor with which the DataSet is range-partitioned.
@@ -1296,9 +1330,9 @@ public abstract class DataSet<T> {
 	/**
 	 * Partitions a tuple DataSet on the specified key fields using a custom partitioner.
 	 * This method takes the key position to partition on, and a partitioner that accepts the key type.
-	 * <p> 
-	 * Note: This method works only on single field keys.
-	 * 
+	 *
+	 * <p>Note: This method works only on single field keys.
+	 *
 	 * @param partitioner The partitioner to assign partitions to keys.
 	 * @param field The field index on which the DataSet is to partitioned.
 	 * @return The partitioned DataSet.
@@ -1306,13 +1340,13 @@ public abstract class DataSet<T> {
 	public <K> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, int field) {
 		return new PartitionOperator<>(this, new Keys.ExpressionKeys<>(new int[] {field}, getType()), clean(partitioner), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Partitions a POJO DataSet on the specified key fields using a custom partitioner.
 	 * This method takes the key expression to partition on, and a partitioner that accepts the key type.
-	 * <p>
-	 * Note: This method works only on single field keys.
-	 * 
+	 *
+	 * <p>Note: This method works only on single field keys.
+	 *
 	 * @param partitioner The partitioner to assign partitions to keys.
 	 * @param field The field index on which the DataSet is to partitioned.
 	 * @return The partitioned DataSet.
@@ -1320,32 +1354,32 @@ public abstract class DataSet<T> {
 	public <K> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, String field) {
 		return new PartitionOperator<>(this, new Keys.ExpressionKeys<>(new String[] {field}, getType()), clean(partitioner), Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Partitions a DataSet on the key returned by the selector, using a custom partitioner.
 	 * This method takes the key selector to get the key to partition on, and a partitioner that
 	 * accepts the key type.
-	 * <p>
-	 * Note: This method works only on single field keys, i.e. the selector cannot return tuples
+	 *
+	 * <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
 	 * of fields.
-	 * 
+	 *
 	 * @param partitioner The partitioner to assign partitions to keys.
 	 * @param keyExtractor The KeyExtractor with which the DataSet is partitioned.
 	 * @return The partitioned DataSet.
-	 * 
+	 *
 	 * @see KeySelector
 	 */
 	public <K extends Comparable<K>> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keyExtractor) {
 		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
 		return new PartitionOperator<>(this, new Keys.SelectorFunctionKeys<>(keyExtractor, getType(), keyType), clean(partitioner), Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the 
+	 * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the
 	 * following task. This can help to improve performance in case of heavy data skew and compute intensive operations.
-	 * <p>
-	 * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
-	 * 
+	 *
+	 * <p><b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
+	 *
 	 * @return The re-balanced DataSet.
 	 */
 	public PartitionOperator<T> rebalance() {
@@ -1384,7 +1418,7 @@ public abstract class DataSet<T> {
 	 * Locally sorts the partitions of the DataSet on the extracted key in the specified order.
 	 * The DataSet can be sorted on multiple values by returning a tuple from the KeySelector.
 	 *
-	 * Note that no additional sort keys can be appended to a KeySelector sort keys. To sort
+	 * <p>Note that no additional sort keys can be appended to a KeySelector sort keys. To sort
 	 * the partitions by multiple values using KeySelector, the KeySelector must return a tuple
 	 * consisting of the values.
 	 *
@@ -1401,14 +1435,15 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	//  Top-K
 	// --------------------------------------------------------------------------------------------
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Result writing
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Writes a DataSet as text file(s) to the specified location.<br>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.<br/>
+	 * Writes a DataSet as text file(s) to the specified location.
+	 *
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.<br/>
 	 * <br/>
 	 * <span class="strong">Output files and directories</span><br/>
 	 * What output how writeAsText() method produces is depending on other circumstance
@@ -1432,8 +1467,8 @@ public abstract class DataSet<T> {
 	 * <pre>{@code // Parallelism is set to only this particular operation
 	 *dataset.writeAsText("file:///path1").setParallelism(1);
 	 *
-	 * // This will creates the same effect but note all operators' parallelism are set to one 
-	 *env.setParallelism(1); 
+	 * // This will creates the same effect but note all operators' parallelism are set to one
+	 *env.setParallelism(1);
 	 *...
 	 *dataset.writeAsText("file:///path1"); }</pre>
 	 *   </li>
@@ -1448,10 +1483,10 @@ public abstract class DataSet<T> {
 	 *dataset.writeAsText("file:///path1").setParallelism(1); }</pre>
 	 *   </li>
 	 * </ul>
-	 * 
+	 *
 	 * @param filePath The path pointing to the location the text file or files under the directory is written to.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @see TextOutputFormat
 	 */
 	public DataSink<T> writeAsText(String filePath) {
@@ -1459,13 +1494,14 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Writes a DataSet as text file(s) to the specified location.<br>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.  
-	 * 
+	 * Writes a DataSet as text file(s) to the specified location.
+	 *
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.
+	 *
 	 * @param filePath The path pointing to the location the text file is written to.
 	 * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @see TextOutputFormat
 	 * @see DataSet#writeAsText(String) Output files and directories
 	 */
@@ -1476,8 +1512,9 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Writes a DataSet as text file(s) to the specified location.<br>
-	 * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
+	 * Writes a DataSet as text file(s) to the specified location.
+	 *
+	 * <p>For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
 	 *
 	 * @param filePath The path pointing to the location the text file is written to.
 	 * @param formatter formatter that is applied on every element of the DataSet.
@@ -1491,8 +1528,9 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Writes a DataSet as text file(s) to the specified location.<br>
-	 * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
+	 * Writes a DataSet as text file(s) to the specified location.
+	 *
+	 * <p>For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
 	 *
 	 * @param filePath The path pointing to the location the text file is written to.
 	 * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE.
@@ -1507,51 +1545,59 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.<br>
-	 * <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
-	 * For each Tuple field the result of {@link Object#toString()} is written.
-	 * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.<br>
-	 * Tuples are are separated by the newline character ({@code \n}).
-	 * 
+	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.
+	 *
+	 * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>
+	 *
+	 * <p>For each Tuple field the result of {@link Object#toString()} is written.
+	 * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
+	 *
+	 * <p>Tuples are are separated by the newline character ({@code \n}).
+	 *
 	 * @param filePath The path pointing to the location the CSV file is written to.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @see Tuple
 	 * @see CsvOutputFormat
-	 * @see DataSet#writeAsText(String) Output files and directories 
+	 * @see DataSet#writeAsText(String) Output files and directories
 	 */
 	public DataSink<T> writeAsCsv(String filePath) {
 		return writeAsCsv(filePath, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
 	}
 
 	/**
-	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.<br>
-	 * <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
-	 * For each Tuple field the result of {@link Object#toString()} is written.
-	 * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.<br>
-	 * Tuples are are separated by the newline character ({@code \n}).
-	 * 
+	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.
+	 *
+	 * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>
+	 *
+	 * <p>For each Tuple field the result of {@link Object#toString()} is written.
+	 * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
+	 *
+	 * <p>Tuples are are separated by the newline character ({@code \n}).
+	 *
 	 * @param filePath The path pointing to the location the CSV file is written to.
 	 * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @see Tuple
 	 * @see CsvOutputFormat
 	 * @see DataSet#writeAsText(String) Output files and directories
 	 */
 	public DataSink<T> writeAsCsv(String filePath, WriteMode writeMode) {
-		return internalWriteAsCsv(new Path(filePath),CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode);
+		return internalWriteAsCsv(new Path(filePath), CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode);
 	}
 
 	/**
-	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.<br>
-	 * <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
-	 * For each Tuple field the result of {@link Object#toString()} is written.
-	 * 
+	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.
+	 *
+	 * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>
+	 *
+	 * <p>For each Tuple field the result of {@link Object#toString()} is written.
+	 *
 	 * @param filePath The path pointing to the location the CSV file is written to.
 	 * @param rowDelimiter The row delimiter to separate Tuples.
 	 * @param fieldDelimiter The field delimiter to separate Tuple fields.
-	 * 
+	 *
 	 * @see Tuple
 	 * @see CsvOutputFormat
 	 * @see DataSet#writeAsText(String) Output files and directories
@@ -1561,15 +1607,16 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.<br>
-	 * <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
-ยง	 * For each Tuple field the result of {@link Object#toString()} is written.
-	 * 
+	 * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.
+	 *
+	 * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>
+ 	 * For each Tuple field the result of {@link Object#toString()} is written.
+	 *
 	 * @param filePath The path pointing to the location the CSV file is written to.
 	 * @param rowDelimiter The row delimiter to separate Tuples.
 	 * @param fieldDelimiter The field delimiter to separate Tuple fields.
 	 * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE.
-	 * 
+	 *
 	 * @see Tuple
 	 * @see CsvOutputFormat
 	 * @see DataSet#writeAsText(String) Output files and directories
@@ -1577,27 +1624,27 @@ public abstract class DataSet<T> {
 	public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) {
 		return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode);
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
 		Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter);
-		if(wm != null) {
+		if (wm != null) {
 			of.setWriteMode(wm);
 		}
 		return output((OutputFormat<T>) of);
 	}
-	
+
 	/**
 	 * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
 	 * the print() method. For programs that are executed in a cluster, this method needs
 	 * to gather the contents of the DataSet back to the client, to print it there.
-	 * 
-	 * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
-	 * 
+	 *
+	 * <p>The string written for each element is defined by the {@link Object#toString()} method.
+	 *
 	 * <p>This method immediately triggers the program execution, similar to the
-	 * {@link #collect()} and {@link #count()} methods.</p>
-	 * 
+	 * {@link #collect()} and {@link #count()} methods.
+	 *
 	 * @see #printToErr()
 	 * @see #printOnTaskManager(String)
 	 */
@@ -1613,11 +1660,11 @@ public abstract class DataSet<T> {
 	 * the print() method. For programs that are executed in a cluster, this method needs
 	 * to gather the contents of the DataSet back to the client, to print it there.
 	 *
-	 * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+	 * <p>The string written for each element is defined by the {@link Object#toString()} method.
 	 *
 	 * <p>This method immediately triggers the program execution, similar to the
-	 * {@link #collect()} and {@link #count()} methods.</p>
-	 * 
+	 * {@link #collect()} and {@link #count()} methods.
+	 *
 	 * @see #print()
 	 * @see #printOnTaskManager(String)
 	 */
@@ -1632,30 +1679,30 @@ public abstract class DataSet<T> {
 	 * Writes a DataSet to the standard output streams (stdout) of the TaskManagers that execute
 	 * the program (or more specifically, the data sink operators). On a typical cluster setup, the
 	 * data will appear in the TaskManagers' <i>.out</i> files.
-	 * 
+	 *
 	 * <p>To print the data to the console or stdout stream of the client process instead, use the
-	 * {@link #print()} method.</p>
-	 * 
-	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p>
+	 * {@link #print()} method.
+	 *
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.
 	 *
 	 * @param prefix The string to prefix each line of the output with. This helps identifying outputs
-	 *               from different printing sinks.   
+	 *               from different printing sinks.
 	 * @return The DataSink operator that writes the DataSet.
-	 *  
+	 *
 	 * @see #print()
 	 */
 	public DataSink<T> printOnTaskManager(String prefix) {
 		return output(new PrintingOutputFormat<T>(prefix, false));
 	}
-	
+
 	/**
 	 * Writes a DataSet to the standard output stream (stdout).
-	 * 
-	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p>
+	 *
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.
 	 *
 	 * @param sinkIdentifier The string to prefix the output with.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @deprecated Use {@link #printOnTaskManager(String)} instead.
 	 */
 	@Deprecated
@@ -1666,29 +1713,28 @@ public abstract class DataSet<T> {
 
 	/**
 	 * Writes a DataSet to the standard error stream (stderr).
-	 * 
-	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p>
+	 *
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.
 	 *
 	 * @param sinkIdentifier The string to prefix the output with.
 	 * @return The DataSink that writes the DataSet.
-	 * 
-	 * @deprecated Use {@link #printOnTaskManager(String)} instead, othe 
-	 *             {@link PrintingOutputFormat} instead.
+	 *
+	 * @deprecated Use {@link #printOnTaskManager(String)} instead, or the {@link PrintingOutputFormat}.
 	 */
 	@Deprecated
 	@PublicEvolving
 	public DataSink<T> printToErr(String sinkIdentifier) {
 		return output(new PrintingOutputFormat<T>(sinkIdentifier, true));
 	}
-	
+
 	/**
 	 * Writes a DataSet using a {@link FileOutputFormat} to a specified location.
 	 * This method adds a data sink to the program.
-	 * 
+	 *
 	 * @param outputFormat The FileOutputFormat to write the DataSet.
 	 * @param filePath The path to the location where the DataSet is written.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @see FileOutputFormat
 	 */
 	public DataSink<T> write(FileOutputFormat<T> outputFormat, String filePath) {
@@ -1698,16 +1744,16 @@ public abstract class DataSet<T> {
 		outputFormat.setOutputFilePath(new Path(filePath));
 		return output(outputFormat);
 	}
-	
+
 	/**
 	 * Writes a DataSet using a {@link FileOutputFormat} to a specified location.
 	 * This method adds a data sink to the program.
-	 * 
+	 *
 	 * @param outputFormat The FileOutputFormat to write the DataSet.
 	 * @param filePath The path to the location where the DataSet is written.
 	 * @param writeMode The mode of writing, indicating whether to overwrite existing files.
 	 * @return The DataSink that writes the DataSet.
-	 * 
+	 *
 	 * @see FileOutputFormat
 	 */
 	public DataSink<T> write(FileOutputFormat<T> outputFormat, String filePath, WriteMode writeMode) {
@@ -1719,26 +1765,26 @@ public abstract class DataSet<T> {
 		outputFormat.setWriteMode(writeMode);
 		return output(outputFormat);
 	}
-	
+
 	/**
 	 * Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program.
 	 * Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks
 	 * or transformations) at the same time.
-	 * 
+	 *
 	 * @param outputFormat The OutputFormat to process the DataSet.
 	 * @return The DataSink that processes the DataSet.
-	 * 
+	 *
 	 * @see OutputFormat
 	 * @see DataSink
 	 */
 	public DataSink<T> output(OutputFormat<T> outputFormat) {
 		Preconditions.checkNotNull(outputFormat);
-		
+
 		// configure the type if needed
 		if (outputFormat instanceof InputTypeConfigurable) {
-			((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig() );
+			((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig());
 		}
-		
+
 		DataSink<T> sink = new DataSink<>(this, outputFormat, getType());
 		this.context.registerDataSink(sink);
 		return sink;
@@ -1747,12 +1793,11 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	//  Utilities
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected static void checkSameExecutionContext(DataSet<?> set1, DataSet<?> set2) {
 		if (set1.getExecutionEnvironment() != set2.getExecutionEnvironment()) {
 			throw new IllegalArgumentException("The two inputs have different execution contexts.");
 		}
 	}
 
-
 }