You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 16:23:03 UTC

[3/4] flink git commit: [FLINK-7191] Activate checkstyle flink-java/operators/translation

[FLINK-7191] Activate checkstyle flink-java/operators/translation

This closes #4334.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e975362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e975362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e975362

Branch: refs/heads/master
Commit: 8e975362312c727fd602429778bc1c3628b95619
Parents: 0c9c9fb
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:42:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 17:04:26 2017 +0200

----------------------------------------------------------------------
 .../CombineToGroupCombineWrapper.java           |   1 +
 .../translation/KeyExtractingMapper.java        |  23 +--
 .../translation/KeyRemovingMapper.java          |   9 +-
 .../PlanBothUnwrappingCoGroupOperator.java      |  22 +--
 .../translation/PlanFilterOperator.java         |  17 +-
 .../PlanLeftUnwrappingCoGroupOperator.java      |  15 +-
 .../translation/PlanProjectOperator.java        |  25 +--
 .../PlanRightUnwrappingCoGroupOperator.java     |  11 +-
 .../PlanUnwrappingGroupCombineOperator.java     |  29 ++--
 .../PlanUnwrappingReduceGroupOperator.java      |  47 +++---
 .../PlanUnwrappingReduceOperator.java           |  12 +-
 ...lanUnwrappingSortedGroupCombineOperator.java |  15 +-
 ...PlanUnwrappingSortedReduceGroupOperator.java |  23 ++-
 .../RichCombineToGroupCombineWrapper.java       |   3 +-
 .../translation/Tuple3UnwrappingIterator.java   |   4 +-
 .../translation/Tuple3WrappingCollector.java    |   3 +-
 .../translation/TupleLeftUnwrappingJoiner.java  |   8 +
 .../translation/TupleRightUnwrappingJoiner.java |   8 +
 .../translation/TupleUnwrappingIterator.java    |  14 +-
 .../translation/TupleUnwrappingJoiner.java      |   8 +
 .../translation/TupleWrappingCollector.java     |  13 +-
 .../translation/TwoKeyExtractingMapper.java     |  10 +-
 .../operators/translation/WrappingFunction.java |  13 +-
 .../translation/AggregateTranslationTest.java   |  35 ++--
 .../translation/CoGroupSortTranslationTest.java |  53 +++---
 .../DeltaIterationTranslationTest.java          | 160 ++++++++++---------
 .../translation/DistinctTranslationTest.java    |  15 +-
 .../translation/ReduceTranslationTests.java     |  98 ++++++------
 tools/maven/suppressions-java.xml               |   8 -
 29 files changed, 375 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
index 408d4b3..f574218 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index f35b950..1f0cc1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,34 +19,37 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+/**
+ * Mapper that extracts keys.
+ * @param <T> type of value
+ * @param <K> type of key
+ */
 @Internal
 @ForwardedFields("*->1")
 public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private final KeySelector<T, K> keySelector;
-	
+
 	private final Tuple2<K, T> tuple = new Tuple2<K, T>();
-	
-	
+
 	public KeyExtractingMapper(KeySelector<T, K> keySelector) {
 		this.keySelector = keySelector;
 	}
-	
-	
+
 	@Override
 	public Tuple2<K, T> map(T value) throws Exception {
-		
+
 		K key = keySelector.getKey(value);
 		tuple.f0 = key;
 		tuple.f1 = value;
-		
+
 		return tuple;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 5f0de32..920f893 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -23,12 +23,17 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+/**
+ * Mapper that removes keys.
+ * @param <T> type of values
+ * @param <K> type of keys
+ */
 @Internal
 @ForwardedFields("1->*")
 public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	public T map(Tuple2<K, T> value) {
 		return value.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
index 1814329..6ccf0f3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * A co group operator that applies the operation only on the unwrapped values.
+ */
 @Internal
 public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
-		extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
-{
+		extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> {
 
 	public PlanBothUnwrappingCoGroupOperator(
 			CoGroupFunction<I1, I2, OUT> udf,
@@ -52,23 +54,21 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				name);
 	}
 
-	public static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
+	private static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
 			extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
-			implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
-	{
+			implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private final TupleUnwrappingIterator<I1, K> iter1;
 		private final TupleUnwrappingIterator<I2, K> iter2;
-		
+
 		private TupleBothUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
 			super(wrapped);
-			
+
 			this.iter1 = new TupleUnwrappingIterator<I1, K>();
 			this.iter2 = new TupleUnwrappingIterator<I2, K>();
 		}
 
-
 		@Override
 		public void coGroup(
 				Iterable<Tuple2<K, I1>> records1,
@@ -79,6 +79,6 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
 			iter2.set(records2.iterator());
 			this.wrappedFunction.coGroup(iter1, iter2, out);
 		}
-		
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index c93191f..ecf1aac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -27,26 +27,33 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.util.Collector;
 
+/**
+ * @see FilterOperatorBase
+ * @param <T>
+ */
 @Internal
 @ForwardedFields("*")
 public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {
-	
+
 	public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
 		super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
 	}
 
+	/**
+	 * @see FlatMapFunction
+	 * @param <T>
+	 */
 	public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
-		implements FlatMapFunction<T, T>
-	{
+		implements FlatMapFunction<T, T> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private FlatMapFilter(FilterFunction<T> wrapped) {
 			super(wrapped);
 		}
 
 		@Override
-		public final void flatMap(T value, Collector<T> out) throws Exception {
+		public void flatMap(T value, Collector<T> out) throws Exception {
 			if (this.wrappedFunction.filter(value)) {
 				out.collect(value);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
index 78840ce..b2a6937 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the left.
+ */
 @Internal
 public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
-		extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>>
-{
+		extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>> {
 
 	public PlanLeftUnwrappingCoGroupOperator(
 			CoGroupFunction<I1, I2, OUT> udf,
@@ -52,21 +54,20 @@ public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				name);
 	}
 
-	public static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
+	private static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
 			extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
 			implements CoGroupFunction<Tuple2<K, I1>, I2, OUT> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private final TupleUnwrappingIterator<I1, K> iter1;
 
 		private TupleLeftUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
 			super(wrapped);
-			
+
 			this.iter1 = new TupleUnwrappingIterator<I1, K>();
 		}
 
-
 		@Override
 		public void coGroup(
 				Iterable<Tuple2<K, I1>> records1,

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index fe981a5..3960807 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -27,30 +27,33 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 
+/**
+ * A map operator that retains a subset of fields from incoming tuples.
+ *
+ * @param <T> Input tuple type
+ * @param <R> Output tuple type
+ */
 @Internal
 public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
 
 	public PlanProjectOperator(int[] fields, String name,
 								TypeInformation<T> inType, TypeInformation<R> outType,
-								ExecutionConfig executionConfig)
-	{
+								ExecutionConfig executionConfig) {
 		super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
 		return (MapFunction<T, R>) new MapProjector<X, R>(fields);
 	}
-	
-	
-	public static final class MapProjector<T extends Tuple, R extends Tuple> 
-			extends AbstractRichFunction implements MapFunction<T, R>
-	{
+
+	private static final class MapProjector<T extends Tuple, R extends Tuple>
+			extends AbstractRichFunction implements MapFunction<T, R> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private final int[] fields;
 		private final Tuple outTuple;
-		
+
 		private MapProjector(int[] fields) {
 			this.fields = fields;
 			try {
@@ -69,7 +72,7 @@ public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T,
 			for (int i = 0; i < fields.length; i++) {
 				outTuple.setField(inTuple.getField(fields[i]), i);
 			}
-			
+
 			return (R) outTuple;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index faeca4e..f86deb7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the right.
+ */
 @Internal
 public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
-		extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>>
-{
+		extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>> {
 
 	public PlanRightUnwrappingCoGroupOperator(
 			CoGroupFunction<I1, I2, OUT> udf,
@@ -52,7 +54,7 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				name);
 	}
 
-	public static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
+	private static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
 			extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
 			implements CoGroupFunction<I1, Tuple2<K, I2>, OUT> {
 
@@ -66,7 +68,6 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
 			this.iter2 = new TupleUnwrappingIterator<I2, K>();
 		}
 
-
 		@Override
 		public void coGroup(
 				Iterable<I1> records1,

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index e9feb61..6e6f226 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
@@ -35,35 +35,32 @@ import org.apache.flink.util.Collector;
 public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {
 
 	public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
-												TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
-	{
+												TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey) {
 		super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
 				new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-		
+
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
-		implements GroupCombineFunction<Tuple2<K, IN>, OUT>
-	{
-	
+
+	private static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+		implements GroupCombineFunction<Tuple2<K, IN>, OUT> {
+
 		private static final long serialVersionUID = 1L;
-		
-		private final TupleUnwrappingIterator<IN, K> iter; 
-		
+
+		private final TupleUnwrappingIterator<IN, K> iter;
+
 		private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<IN, K>();
 		}
-	
-	
+
 		@Override
 		public void combine(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
 			this.wrappedFunction.combine(iter, out);
 		}
-		
+
 		@Override
 		public String toString() {
 			return this.wrappedFunction.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 8568659..33c527d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  * on the unwrapped values.
  */
 @Internal
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>, OUT>> {
 
 	public PlanUnwrappingReduceGroupOperator(
 		GroupReduceFunction<IN, OUT> udf,
@@ -41,32 +41,30 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 		String name,
 		TypeInformation<OUT> outType,
 		TypeInformation<Tuple2<K, IN>> typeInfoWithKey,
-		boolean combinable)
-	{
+		boolean combinable) {
 		super(
 			combinable ?
 				new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>(udf) :
 				new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
 			new UnaryOperatorInformation<>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-		
+
 		super.setCombinable(combinable);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
-	{
+
+	private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private TupleUnwrappingIterator<IN, K> iter;
 		private TupleWrappingCollector<IN, K> coll;
 
 		private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 
-			if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+			if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
 				throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
 			}
 
@@ -74,7 +72,6 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 			this.coll = new TupleWrappingCollector<>(this.iter);
 		}
 
-
 		@Override
 		public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
@@ -87,35 +84,33 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 
 			iter.set(values.iterator());
 			coll.set(out);
-			((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+			((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
 		}
-		
+
 		@Override
 		public String toString() {
 			return this.wrappedFunction.toString();
 		}
 	}
-	
-	public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple2<K, IN>, OUT>
-	{
-	
+
+	private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT> {
+
 		private static final long serialVersionUID = 1L;
-		
-		private final TupleUnwrappingIterator<IN, K> iter; 
-		
+
+		private final TupleUnwrappingIterator<IN, K> iter;
+
 		private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<>();
 		}
-	
-	
+
 		@Override
 		public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
 			this.wrappedFunction.reduce(iter, out);
 		}
-		
+
 		@Override
 		public String toString() {
 			return this.wrappedFunction.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 72dc41a..b2e614e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-
 /**
  * A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only
  * on the unwrapped values.
@@ -35,16 +34,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
 
 	public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name,
-			TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey)
-	{
+			TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey) {
 		super(new ReduceWrapper<T, K>(udf), new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey), key.computeLogicalKeyPositions(), name);
 	}
 
-	public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
-		implements ReduceFunction<Tuple2<K, T>>
-	{
+	private static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
+		implements ReduceFunction<Tuple2<K, T>> {
 		private static final long serialVersionUID = 1L;
-		
 
 		private ReduceWrapper(ReduceFunction<T> wrapped) {
 			super(wrapped);

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index f65f169..a2a9010 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
@@ -32,21 +32,19 @@ import org.apache.flink.util.Collector;
  * operation only on the unwrapped values.
  */
 @Internal
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>> {
 
 	public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
-													TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
-	{
+													TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey) {
 		super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
 				new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType),
-				groupingKey.computeLogicalKeyPositions(), 
+				groupingKey.computeLogicalKeyPositions(),
 				name);
 
 	}
 
-	public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
-			implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
-	{
+	private static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+			implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -57,7 +55,6 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G
 			this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
 		}
 
-
 		@Override
 		public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 8080477..7f81fee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  * operation only on the unwrapped values.
  */
 @Internal
-public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>> {
 
 	public PlanUnwrappingSortedReduceGroupOperator(
 		GroupReduceFunction<IN, OUT> udf,
@@ -42,8 +42,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		String name,
 		TypeInformation<OUT> outType,
 		TypeInformation<Tuple3<K1, K2, IN>>
-		typeInfoWithKey, boolean combinable)
-	{
+		typeInfoWithKey, boolean combinable) {
 		super(
 			combinable ?
 				new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) :
@@ -55,9 +54,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 
 	// --------------------------------------------------------------------------------------------
 
-	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
-	{
+	private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -67,7 +65,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 
-			if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+			if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
 				throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
 			}
 
@@ -75,7 +73,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 			this.coll = new Tuple3WrappingCollector<>(this.iter);
 		}
 
-
 		@Override
 		public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
@@ -87,7 +84,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception {
 			iter.set(values.iterator());
 			coll.set(out);
-			((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+			((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
 		}
 
 		@Override
@@ -96,9 +93,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		}
 	}
 
-	public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>
-	{
+	private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -109,7 +105,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 			this.iter = new Tuple3UnwrappingIterator<>();
 		}
 
-
 		@Override
 		public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index d8c54d6..3f6463a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.api.common.functions.CombineFunction;
@@ -31,7 +32,7 @@ import org.apache.flink.util.Preconditions;
  * and makes it look like a function that implements {@link GroupCombineFunction} and {@link GroupReduceFunction} to the runtime.
  */
 public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduceFunction<IN, OUT> & CombineFunction<IN, IN>>
-	extends RichGroupCombineFunction<IN,IN> implements GroupReduceFunction<IN, OUT> {
+	extends RichGroupCombineFunction<IN, IN> implements GroupReduceFunction<IN, OUT> {
 
 	private final F wrappedFunction;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
index fd3b4f6..b697ac9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.TraversableOnceException;
 
+import java.util.Iterator;
+
 /**
  * An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (third field).
  * The iterator also tracks the groupKeys, as the triples flow though it.

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
index 189dcdb..57b6bc7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
 /**
- * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting
+ * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting.
  */
 @Internal
 public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.io.Serializable {
@@ -35,7 +35,6 @@ public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.
 
 	private Collector<Tuple3<K1, K2, IN>> wrappedCollector;
 
-
 	public Tuple3WrappingCollector(Tuple3UnwrappingIterator<IN, K1, K2> tui) {
 		this.tui = tui;
 		this.outTuple = new Tuple3<K1, K2, IN>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
index 2ff73ef..e39ec47 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * Joiner that unwraps values from the left set before applying the join operation.
+ *
+ * @param <I1>  type of values in the left set
+ * @param <I2>  type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K>   type of key
+ */
 @Internal
 public final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
 		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
index c9b9c27..847acd7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * Joiner that unwraps values from the right set before applying the join operation.
+ *
+ * @param <I1>  type of values in the left set
+ * @param <I2>  type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K>   type of key
+ */
 @Internal
 public final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
 		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 5dbe266..16ebef8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.TraversableOnceException;
 
+import java.util.Iterator;
+
 /**
  * An iterator that reads 2-tuples (key value pairs) and returns only the values (second field).
  * The iterator also tracks the keys, as the pairs flow though it.
@@ -32,16 +32,16 @@ import org.apache.flink.util.TraversableOnceException;
 public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
-	
-	private K lastKey; 
+
+	private K lastKey;
 	private Iterator<Tuple2<K, T>> iterator;
 	private boolean iteratorAvailable;
-	
+
 	public void set(Iterator<Tuple2<K, T>> iterator) {
 		this.iterator = iterator;
 		this.iteratorAvailable = true;
 	}
-	
+
 	public K getLastKey() {
 		return lastKey;
 	}
@@ -53,7 +53,7 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>,
 
 	@Override
 	public T next() {
-		Tuple2<K, T> t = iterator.next(); 
+		Tuple2<K, T> t = iterator.next();
 		this.lastKey = t.f0;
 		return t.f1;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
index e0ee3b3..1e56dac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * Joiner that unwraps both values before applying the join operation.
+ *
+ * @param <I1>  type of values in the left set
+ * @param <I2>  type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K>   type of key
+ */
 @Internal
 public final class TupleUnwrappingJoiner<I1, I2, OUT, K>
 		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
index 4581bf2..7369804 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
@@ -23,28 +23,27 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
 /**
- * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function
+ * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function.
  */
 @Internal
 public class TupleWrappingCollector<IN, K> implements Collector<IN>, java.io.Serializable {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	private final TupleUnwrappingIterator<IN, K> tui;
 	private final Tuple2<K, IN> outTuple;
-	
+
 	private Collector<Tuple2<K, IN>> wrappedCollector;
-	
-	
+
 	public TupleWrappingCollector(TupleUnwrappingIterator<IN, K> tui) {
 		this.tui = tui;
 		this.outTuple = new Tuple2<K, IN>();
 	}
-	
+
 	public void set(Collector<Tuple2<K, IN>> wrappedCollector) {
 			this.wrappedCollector = wrappedCollector;
 	}
-		
+
 	@Override
 	public void close() {
 		this.wrappedCollector.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 7d5e39b..9449237 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -19,11 +19,17 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 
+/**
+ * Mapper that extracts two keys of a value.
+ * @param <T> type of the values
+ * @param <K1> type of the first key
+ * @param <K2> type of the second key
+ */
 @Internal
 @ForwardedFields("*->2")
 public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
@@ -36,13 +42,11 @@ public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T,
 
 	private final Tuple3<K1, K2, T> tuple = new Tuple3<K1, K2, T>();
 
-
 	public TwoKeyExtractingMapper(KeySelector<T, K1> keySelector1, KeySelector<T, K2> keySelector2) {
 		this.keySelector1 = keySelector1;
 		this.keySelector2 = keySelector2;
 	}
 
-
 	@Override
 	public Tuple3<K1, K2, T> map(T value) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 9851c42..6c52ace 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -25,9 +25,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 
+/**
+ * Wrapper around {@link Function}.
+ * @param <T>
+ */
 @Internal
 public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	protected T wrappedFunction;
@@ -36,21 +40,20 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 		this.wrappedFunction = wrappedFunction;
 	}
 
-	
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		FunctionUtils.openFunction(this.wrappedFunction, parameters);
 	}
-	
+
 	@Override
 	public void close() throws Exception {
 		FunctionUtils.closeFunction(this.wrappedFunction);
 	}
-	
+
 	@Override
 	public void setRuntimeContext(RuntimeContext t) {
 		super.setRuntimeContext(t);
-		
+
 		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 0ce79e3..2f74288 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -16,25 +16,28 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of aggregations.
+ */
 public class AggregateTranslationTest {
 
 	@Test
@@ -42,26 +45,26 @@ public class AggregateTranslationTest {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Double, StringValue, Long>> initialData = 
+			DataSet<Tuple3<Double, StringValue, Long>> initialData =
 					env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
-			
+
 			initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
+
 			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-			
+
 			// check keys
 			assertEquals(1, reducer.getKeyColumns(0).length);
 			assertEquals(0, reducer.getKeyColumns(0)[0]);
-			
+
 			assertEquals(-1, reducer.getParallelism());
 			assertTrue(reducer.isCombinable());
-			
+
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
index 887173d..9c67e60 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -31,8 +29,16 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of co-group sort.
+ */
 @SuppressWarnings({"serial", "unchecked"})
 public class CoGroupSortTranslationTest implements java.io.Serializable {
 
@@ -40,35 +46,35 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 	public void testGroupSortTuples() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-			
+
 			input1.coGroup(input2)
 				.where(1).equalTo(2)
 				.sortFirstGroup(0, Order.DESCENDING)
 				.sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
-				
+
 				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>, Long>() {
 					@Override
 					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple3<Long, Long, Long>> second,
 							Collector<Long> out) {}
 				})
-				
+
 				.output(new DiscardingOutputFormat<Long>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 			CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-			
+
 			assertNotNull(coGroup.getGroupOrderForInputOne());
 			assertNotNull(coGroup.getGroupOrderForInputTwo());
-			
+
 			assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
 			assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
 			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-			
+
 			assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
 			assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
 			assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -80,39 +86,39 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSortTuplesAndPojos() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 			DataSet<TestPoJo> input2 = env.fromElements(new TestPoJo());
-			
+
 			input1.coGroup(input2)
 				.where(1).equalTo("b")
 				.sortFirstGroup(0, Order.DESCENDING)
 				.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-				
+
 				.with(new CoGroupFunction<Tuple2<Long, Long>, TestPoJo, Long>() {
 					@Override
 					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
 				})
-				
+
 				.output(new DiscardingOutputFormat<Long>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 			CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-			
+
 			assertNotNull(coGroup.getGroupOrderForInputOne());
 			assertNotNull(coGroup.getGroupOrderForInputTwo());
-			
+
 			assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
 			assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
 			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-			
+
 			assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
 			assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
 			assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -124,7 +130,10 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Sample test pojo.
+	 */
 	public static class TestPoJo {
 		public long a;
 		public long b;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index fd60bc6..e4cb8c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -16,93 +16,95 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Iterator;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
+import java.util.Iterator;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of delta iterations.
+ */
 @SuppressWarnings("serial")
 public class DeltaIterationTranslationTest implements java.io.Serializable {
 
 	@Test
 	public void testCorrectTranslation() {
 		try {
-			final String JOB_NAME = "Test JobName";
-			final String ITERATION_NAME = "Test Name";
-			
-			final String BEFORE_NEXT_WORKSET_MAP = "Some Mapper";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final int[] ITERATION_KEYS = new int[] {2};
-			final int NUM_ITERATIONS = 13;
-			
-			final int DEFAULT_parallelism= 133;
-			final int ITERATION_parallelism = 77;
-			
+			final String jobName = "Test JobName";
+			final String iterationName = "Test Name";
+
+			final String beforeNextWorksetMap = "Some Mapper";
+
+			final String aggregatorName = "AggregatorName";
+
+			final int[] iterationKeys = new int[] {2};
+			final int numIterations = 13;
+
+			final int defaultParallelism = 133;
+			final int iterationParallelism = 77;
+
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			// ------------ construct the test program ------------------
 			{
-				env.setParallelism(DEFAULT_parallelism);
-				
+				env.setParallelism(defaultParallelism);
+
 				@SuppressWarnings("unchecked")
 				DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
-	
+
 				@SuppressWarnings("unchecked")
 				DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-				
-				DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS);
-				iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism);
-				
-				iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
+
+				DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, numIterations, iterationKeys);
+				iteration.name(iterationName).parallelism(iterationParallelism);
+
+				iteration.registerAggregator(aggregatorName, new LongSumAggregator());
+
 				// test that multiple workset consumers are supported
-				DataSet<Tuple2<Double, String>> worksetSelfJoin = 
+				DataSet<Tuple2<Double, String>> worksetSelfJoin =
 					iteration.getWorkset()
-						.map(new IdentityMapper<Tuple2<Double,String>>())
+						.map(new IdentityMapper<Tuple2<Double, String>>())
 						.join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1);
-				
+
 				DataSet<Tuple3<Double, Long, String>> joined = worksetSelfJoin.join(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetJoin());
 
 				DataSet<Tuple3<Double, Long, String>> result = iteration.closeWith(
 						joined,
-						joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP));
-				
+						joined.map(new NextWorksetMapper()).name(beforeNextWorksetMap));
+
 				result.output(new DiscardingOutputFormat<Tuple3<Double, Long, String>>());
 				result.writeAsText("/dev/null");
 			}
-			
-			
-			Plan p = env.createProgramPlan(JOB_NAME);
-			
+
+			Plan p = env.createProgramPlan(jobName);
+
 			// ------------- validate the plan ----------------
-			assertEquals(JOB_NAME, p.getJobName());
-			assertEquals(DEFAULT_parallelism, p.getDefaultParallelism());
-			
+			assertEquals(jobName, p.getJobName());
+			assertEquals(defaultParallelism, p.getDefaultParallelism());
+
 			// validate the iteration
 			GenericDataSinkBase<?> sink1, sink2;
 			{
@@ -110,23 +112,23 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 				sink1 = sinks.next();
 				sink2 = sinks.next();
 			}
-			
+
 			DeltaIterationBase<?, ?> iteration = (DeltaIterationBase<?, ?>) sink1.getInput();
-			
+
 			// check that multi consumer translation works for iterations
 			assertEquals(iteration, sink2.getInput());
-			
+
 			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations());
-			assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
+			assertEquals(numIterations, iteration.getMaximumNumberOfIterations());
+			assertArrayEquals(iterationKeys, iteration.getSolutionSetKeyFields());
+			assertEquals(iterationParallelism, iteration.getParallelism());
+			assertEquals(iterationName, iteration.getName());
+
 			MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
 			InnerJoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
 			InnerJoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
 			MapOperatorBase<?, ?, ?> worksetMapper = (MapOperatorBase<?, ?, ?>) worksetSelfJoin.getFirstInput();
-			
+
 			assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
 			assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
 			if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
@@ -137,9 +139,9 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 				assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
 			}
 
-			assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			assertEquals(beforeNextWorksetMap, nextWorksetMapper.getName());
+
+			assertEquals(aggregatorName, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -147,20 +149,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testRejectWhenSolutionSetKeysDontMatchJoin() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
 
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-			
+
 			DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-			
+
 			try {
 				iteration.getWorkset().join(iteration.getSolutionSet()).where(1).equalTo(2);
 				fail("Accepted invalid program.");
@@ -168,7 +170,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			catch (InvalidProgramException e) {
 				// all good!
 			}
-			
+
 			try {
 				iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1);
 				fail("Accepted invalid program.");
@@ -183,20 +185,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testRejectWhenSolutionSetKeysDontMatchCoGroup() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
 
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-			
+
 			DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-			
+
 			try {
 				iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetCoGroup1());
 				fail("Accepted invalid program.");
@@ -204,7 +206,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			catch (InvalidProgramException e) {
 				// all good!
 			}
-			
+
 			try {
 				iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new SolutionWorksetCoGroup2());
 				fail("Accepted invalid program.");
@@ -219,40 +221,40 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+	private static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 		@Override
 		public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){
 			return null;
 		}
 	}
-	
-	public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+
+	private static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
 		@Override
 		public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) {
 			return null;
 		}
 	}
-	
-	public static class IdentityMapper<T> extends RichMapFunction<T, T> {
+
+	private static class IdentityMapper<T> extends RichMapFunction<T, T> {
 
 		@Override
 		public T map(T value) throws Exception {
 			return value;
 		}
 	}
-	
-	public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+	private static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 
 		@Override
 		public void coGroup(Iterable<Tuple2<Double, String>> first, Iterable<Tuple3<Double, Long, String>> second,
 				Collector<Tuple3<Double, Long, String>> out) {
 		}
 	}
-	
-	public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+
+	private static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
 
 		@Override
 		public void coGroup(Iterable<Tuple3<Double, Long, String>> second, Iterable<Tuple2<Double, String>> first,

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 27c7b2f..6a98a8d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Test;
 
 import java.io.Serializable;
@@ -45,6 +46,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for translation of distinct operation.
+ */
 @SuppressWarnings("serial")
 public class DistinctTranslationTest {
 
@@ -164,7 +168,7 @@ public class DistinctTranslationTest {
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
-			initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+			initialData.distinct(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
 				public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
 					return value.f1;
 				}
@@ -183,7 +187,7 @@ public class DistinctTranslationTest {
 			assertEquals(4, reducer.getParallelism());
 
 			// check types
-			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
 					new ValueTypeInfo<StringValue>(StringValue.class),
 					initialData.getType());
 
@@ -245,7 +249,7 @@ public class DistinctTranslationTest {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
 		return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
 				.setParallelism(1);
 	}
@@ -256,6 +260,9 @@ public class DistinctTranslationTest {
 		return env.fromCollection(data);
 	}
 
+	/**
+	 * Custom data type, for testing purposes.
+	 */
 	public static class CustomType implements Serializable {
 
 		private static final long serialVersionUID = 1L;
@@ -269,7 +276,7 @@ public class DistinctTranslationTest {
 
 		@Override
 		public String toString() {
-			return ""+myInt;
+			return "" + myInt;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 3adbbb8..486cad4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -37,10 +36,17 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of reduce operation.
+ */
 @SuppressWarnings("serial")
 public class ReduceTranslationTests implements java.io.Serializable {
 
@@ -49,31 +55,31 @@ public class ReduceTranslationTests implements java.io.Serializable {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-			
-			initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+
+			initialData.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
 				public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 					return value1;
 				}
 			}).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
+
 			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-			
+
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-			
+
 			// check keys
 			assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
-			
+
 			// parallelism was not configured on the operator
 			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-			
+
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {
@@ -82,40 +88,40 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			fail("Test caused an error: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void translateGroupedReduceNoMapper() {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-			
+
 			initialData
 				.groupBy(2)
-				.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+				.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
 					public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 						return value1;
 					}
 				})
 				.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
+
 			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-			
+
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-			
+
 			// parallelism was not configured on the operator
 			assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-			
+
 			// check keys
 			assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));
-			
+
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {
@@ -124,60 +130,58 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			fail("Test caused an error: " + e.getMessage());
 		}
 	}
-	
-	
+
 	@Test
 	public void translateGroupedReduceWithkeyExtractor() {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-			
+
 			initialData
-				.groupBy(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+				.groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
 					public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
 						return value.f1;
 					}
 				})
-				.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+				.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
 					public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 						return value1;
 					}
 				}).setParallelism(4)
 				.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
-			
+
 			MapOperatorBase<?, ?, ?> keyProjector = (MapOperatorBase<?, ?, ?>) sink.getInput();
 			PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyProjector.getInput();
 			MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
-			
+
 			// check the parallelisms
 			assertEquals(1, keyExtractor.getParallelism());
 			assertEquals(4, reducer.getParallelism());
 			assertEquals(4, keyProjector.getParallelism());
-			
+
 			// check types
-			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
 					new ValueTypeInfo<StringValue>(StringValue.class),
 					initialData.getType());
-			
+
 			assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType());
 			assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
-			
+
 			assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
 			assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType());
-			
+
 			assertEquals(keyValueInfo, keyProjector.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), keyProjector.getOperatorInfo().getOutputType());
-			
+
 			// check keys
 			assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
-			
+
 			assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {
@@ -186,9 +190,9 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			fail("Test caused an error: " + e.getMessage());
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
-	private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
 		return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
 				.setParallelism(1);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
index d7e42e5..3bb8556 100644
--- a/tools/maven/suppressions-java.xml
+++ b/tools/maven/suppressions-java.xml
@@ -33,14 +33,6 @@ under the License.
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 
 	<suppress
-		files="(.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
-		checks="AvoidStarImport|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
-		checks="AvoidStarImport"/>
-
-	<suppress
 		files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>