You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 11:59:10 UTC

[03/12] flink git commit: [FLINK-7192] [java] Activate checkstyle flink-java/test/operator

[FLINK-7192] [java] Activate checkstyle flink-java/test/operator

This closes #4335.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca5d8afe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca5d8afe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca5d8afe

Branch: refs/heads/master
Commit: ca5d8afee8321e0dff063e8404538b132e979739
Parents: 80468b1
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:41:50 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 12:11:11 2017 +0200

----------------------------------------------------------------------
 .../java/operator/AggregateOperatorTest.java    |  56 +-
 .../api/java/operator/CoGroupOperatorTest.java  | 179 ++---
 .../api/java/operator/CrossOperatorTest.java    | 105 +--
 .../flink/api/java/operator/DataSinkTest.java   |  15 +-
 .../api/java/operator/DistinctOperatorTest.java |  93 +--
 .../api/java/operator/FirstNOperatorTest.java   |  90 +--
 .../operator/FullOuterJoinOperatorTest.java     |  12 +-
 .../java/operator/GroupCombineOperatorTest.java |  36 +-
 .../java/operator/GroupReduceOperatorTest.java  |  36 +-
 .../flink/api/java/operator/GroupingTest.java   | 145 ++--
 .../api/java/operator/JoinOperatorTest.java     | 447 +++++------
 .../operator/LeftOuterJoinOperatorTest.java     |  17 +-
 .../api/java/operator/MaxByOperatorTest.java    |  51 +-
 .../api/java/operator/MinByOperatorTest.java    |  54 +-
 .../flink/api/java/operator/OperatorTest.java   |   4 +
 .../java/operator/PartitionOperatorTest.java    | 746 ++++++++++---------
 .../java/operator/ProjectionOperatorTest.java   | 100 +--
 .../api/java/operator/ReduceOperatorTest.java   |  28 +-
 .../operator/RightOuterJoinOperatorTest.java    |  12 +-
 .../api/java/operator/SortPartitionTest.java    |  38 +-
 20 files changed, 1179 insertions(+), 1085 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
index d9678eb..7977b68 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
@@ -18,28 +18,32 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.junit.Assert;
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#aggregate(Aggregations, int)}.
+ */
 public class AggregateOperatorTest {
 
 	// TUPLE DATA
-	
-	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -47,59 +51,59 @@ public class AggregateOperatorTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
 	// LONG DATA
-	
+
 	private final List<Long> emptyLongData = new ArrayList<Long>();
-	
+
 	@Test
 	public void testFieldsAggregate() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.aggregate(Aggregations.SUM, 1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work: index out of bounds
 		try {
 			tupleDs.aggregate(Aggregations.SUM, 10);
 			Assert.fail();
-		} catch(IllegalArgumentException iae) {
+		} catch (IllegalArgumentException iae) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work: not applied to tuple dataset
 		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
 		try {
 			longDs.aggregate(Aggregations.MIN, 1);
 			Assert.fail();
-		} catch(InvalidProgramException uoe) {
+		} catch (InvalidProgramException uoe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 	}
-	
+
 	@Test
 	public void testAggregationTypes() {
 		try {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-			
+
 			// should work: multiple aggregates
 			tupleDs.aggregate(Aggregations.SUM, 0).and(Aggregations.MIN, 4);
 
 			// should work: nested aggregates
 			tupleDs.aggregate(Aggregations.MIN, 2).aggregate(Aggregations.SUM, 1);
-			
+
 			// should not work: average on string
 			try {
 				tupleDs.aggregate(Aggregations.SUM, 2);
@@ -108,7 +112,7 @@ public class AggregateOperatorTest {
 				// we're good here
 			}
 		}
-		catch(Exception e) {
+		catch (Exception e) {
 			System.err.println(e.getMessage());
 			e.printStackTrace();
 			Assert.fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 90a65e6..d8b5f00 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,14 +42,17 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link DataSet#coGroup(DataSet)}.
+ */
 @SuppressWarnings("serial")
 public class CoGroupOperatorTest {
 
 	// TUPLE DATA
-	private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+	private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -56,17 +60,17 @@ public class CoGroupOperatorTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
 	private static List<CustomType> customTypeData = new ArrayList<CustomType>();
-	
+
 	@BeforeClass
 	public static void insertCustomData() {
 		customTypeData.add(new CustomType());
 	}
-	
-	@Test  
+
+	@Test
 	public void testCoGroupKeyFields1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -74,14 +78,14 @@ public class CoGroupOperatorTest {
 		// should work
 		try {
 			ds1.coGroup(ds2).where(0).equalTo(0);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyFields2() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -89,21 +93,21 @@ public class CoGroupOperatorTest {
 		// should not work, incompatible cogroup key types
 		ds1.coGroup(ds2).where(0).equalTo(2);
 	}
-	
+
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyFields3() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should not work, incompatible number of cogroup keys
-		ds1.coGroup(ds2).where(0,1).equalTo(2);
+		ds1.coGroup(ds2).where(0, 1).equalTo(2);
 	}
-	
+
 	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCoGroupKeyFields4() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -111,10 +115,10 @@ public class CoGroupOperatorTest {
 		// should not work, cogroup key out of range
 		ds1.coGroup(ds2).where(5).equalTo(0);
 	}
-	
+
 	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCoGroupKeyFields5() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -122,10 +126,10 @@ public class CoGroupOperatorTest {
 		// should not work, negative key field position
 		ds1.coGroup(ds2).where(-1).equalTo(-1);
 	}
-	
+
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyFields6() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -144,7 +148,7 @@ public class CoGroupOperatorTest {
 		// should work
 		try {
 			ds1.coGroup(ds2).where("myInt").equalTo("myInt");
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -253,7 +257,7 @@ public class CoGroupOperatorTest {
 
 		ds1.coGroup(ds2).where("*").equalTo("*");
 	}
-	
+
 	@Test
 	public void testCoGroupKeyExpressions1Nested() {
 
@@ -264,7 +268,7 @@ public class CoGroupOperatorTest {
 		// should work
 		try {
 			ds1.coGroup(ds2).where("nested.myInt").equalTo("nested.myInt");
-		} catch(Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			Assert.fail();
 		}
@@ -302,10 +306,10 @@ public class CoGroupOperatorTest {
 		// should not work, cogroup key non-existent
 		ds1.coGroup(ds2).where("nested.myNonExistent").equalTo("nested.myInt");
 	}
-	
+
 	@Test
 	public void testCoGroupKeySelectors1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -315,7 +319,7 @@ public class CoGroupOperatorTest {
 			ds1.coGroup(ds2)
 			.where(
 					new KeySelector<CustomType, Long>() {
-							
+
 							@Override
 							public Long getKey(CustomType value) {
 								return value.myLong;
@@ -324,32 +328,31 @@ public class CoGroupOperatorTest {
 					)
 			.equalTo(
 					new KeySelector<CustomType, Long>() {
-							
+
 							@Override
 							public Long getKey(CustomType value) {
 								return value.myLong;
 							}
 						}
 					);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCoGroupKeyMixing1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
-		
 
 		// should work
 		try {
 			ds1.coGroup(ds2)
 			.where(
 					new KeySelector<CustomType, Long>() {
-							
+
 							@Override
 							public Long getKey(CustomType value) {
 								return value.myLong;
@@ -357,14 +360,14 @@ public class CoGroupOperatorTest {
 						}
 					)
 			.equalTo(3);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCoGroupKeyMixing2() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -375,21 +378,21 @@ public class CoGroupOperatorTest {
 			.where(3)
 			.equalTo(
 					new KeySelector<CustomType, Long>() {
-							
+
 							@Override
 							public Long getKey(CustomType value) {
 								return value.myLong;
 							}
 						}
 					);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyMixing3() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -399,7 +402,7 @@ public class CoGroupOperatorTest {
 		.where(2)
 		.equalTo(
 				new KeySelector<CustomType, Long>() {
-						
+
 						@Override
 						public Long getKey(CustomType value) {
 							return value.myLong;
@@ -407,20 +410,20 @@ public class CoGroupOperatorTest {
 					}
 				);
 	}
-	
+
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyMixing4() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, more than one key field position
 		ds1.coGroup(ds2)
-		.where(1,3)
+		.where(1, 3)
 		.equalTo(
 				new KeySelector<CustomType, Long>() {
-						
+
 						@Override
 						public Long getKey(CustomType value) {
 							return value.myLong;
@@ -436,32 +439,32 @@ public class CoGroupOperatorTest {
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+		CoGroupOperator<?, ?, ?> coGroupOp = tupleDs1.coGroup(tupleDs2)
 				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
 				.with(new DummyTestCoGroupFunction1());
 
 		SemanticProperties semProps = coGroupOp.getSemanticProperties();
 
 		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
-		assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
-		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
-		assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
-		assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
-		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
-
-		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
-		assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
-		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
-		assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 6).contains(0));
 
 		assertTrue(semProps.getReadFields(0).size() == 3);
 		assertTrue(semProps.getReadFields(0).contains(2));
@@ -480,7 +483,7 @@ public class CoGroupOperatorTest {
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+		CoGroupOperator<?, ?, ?> coGroupOp = tupleDs1.coGroup(tupleDs2)
 				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
 				.with(new DummyTestCoGroupFunction2())
 				.withForwardedFieldsFirst("2;4->0")
@@ -488,26 +491,26 @@ public class CoGroupOperatorTest {
 
 		SemanticProperties semProps = coGroupOp.getSemanticProperties();
 
-		assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
-		assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
-		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
-		assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
-
-		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
-		assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
-		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
-		assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
-		assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
-		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
-		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 0);
 
 		assertTrue(semProps.getReadFields(0).size() == 3);
 		assertTrue(semProps.getReadFields(0).contains(2));
@@ -517,7 +520,7 @@ public class CoGroupOperatorTest {
 		assertTrue(semProps.getReadFields(1) == null);
 	}
 
-	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+	private static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
 		@Override
 		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
 			return new Tuple2<Long, Integer>();
@@ -528,7 +531,7 @@ public class CoGroupOperatorTest {
 	@FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
 	@FunctionAnnotation.ReadFieldsFirst("0;2;4")
 	@FunctionAnnotation.ReadFieldsSecond("1;3")
-	public static class DummyTestCoGroupFunction1
+	private static class DummyTestCoGroupFunction1
 			implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
 						Tuple5<Integer, Long, String, Long, Integer>,
 						Tuple5<Integer, Long, String, Long, Integer>> {
@@ -541,7 +544,7 @@ public class CoGroupOperatorTest {
 	}
 
 	@FunctionAnnotation.ReadFieldsFirst("0;1;2")
-	public static class DummyTestCoGroupFunction2
+	private static class DummyTestCoGroupFunction2
 			implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
 			Tuple5<Integer, Long, String, Long, Integer>,
 			Tuple5<Integer, Long, String, Long, Integer>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
index 59d2d61..22b03fc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -31,6 +32,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Tests for {@link DataSet#cross(DataSet)}.
+ */
 public class CrossOperatorTest {
 
 	// TUPLE DATA
@@ -64,11 +68,11 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectFirst(0);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCrossProjection21() {
 
@@ -80,7 +84,7 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectFirst(0);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -95,12 +99,12 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0,3);
-		} catch(Exception e) {
+				.projectFirst(0, 3);
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCrossProjection22() {
 
@@ -111,8 +115,8 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0,3);
-		} catch(Exception e) {
+				.projectFirst(0, 3);
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -129,11 +133,11 @@ public class CrossOperatorTest {
 			ds1.cross(ds2)
 				.projectFirst(0)
 				.projectSecond(3);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCrossProjection23() {
 
@@ -146,7 +150,7 @@ public class CrossOperatorTest {
 			ds1.cross(ds2)
 				.projectFirst(0)
 				.projectSecond(3);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -161,15 +165,15 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0,2)
-				.projectSecond(1,4)
+				.projectFirst(0, 2)
+				.projectSecond(1, 4)
 				.projectFirst(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 
 	}
-	
+
 	@Test
 	public void testCrossProjection24() {
 
@@ -180,10 +184,10 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0,2)
-				.projectSecond(1,4)
+				.projectFirst(0, 2)
+				.projectSecond(1, 4)
 				.projectFirst(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 
@@ -199,14 +203,14 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectSecond(0,2)
-				.projectFirst(1,4)
+				.projectSecond(0, 2)
+				.projectFirst(1, 4)
 				.projectFirst(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCrossProjection25() {
 
@@ -217,10 +221,10 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectSecond(0,2)
-				.projectFirst(1,4)
+				.projectSecond(0, 2)
+				.projectFirst(1, 4)
 				.projectFirst(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -237,11 +241,11 @@ public class CrossOperatorTest {
 			ds1.cross(ds2)
 				.projectFirst()
 				.projectSecond();
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCrossProjection26() {
 
@@ -254,7 +258,7 @@ public class CrossOperatorTest {
 			ds1.cross(ds2)
 				.projectFirst()
 				.projectSecond();
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -270,12 +274,12 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectSecond()
-				.projectFirst(1,4);
-		} catch(Exception e) {
+				.projectFirst(1, 4);
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testCrossProjection27() {
 
@@ -287,13 +291,13 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectSecond()
-				.projectFirst(1,4);
-		} catch(Exception e) {
+				.projectFirst(1, 4);
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
 
-	@Test(expected=IndexOutOfBoundsException.class)
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection8() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -304,8 +308,8 @@ public class CrossOperatorTest {
 		ds1.cross(ds2)
 			.projectFirst(5);
 	}
-	
-	@Test(expected=IndexOutOfBoundsException.class)
+
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection28() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -317,7 +321,7 @@ public class CrossOperatorTest {
 			.projectFirst(5);
 	}
 
-	@Test(expected=IndexOutOfBoundsException.class)
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection9() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -329,7 +333,7 @@ public class CrossOperatorTest {
 			.projectSecond(5);
 	}
 
-	@Test(expected=IndexOutOfBoundsException.class)
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection29() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -351,8 +355,8 @@ public class CrossOperatorTest {
 		ds1.cross(ds2)
 			.projectFirst(2);
 	}
-	
-	@Test(expected=IndexOutOfBoundsException.class)
+
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection30() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -375,7 +379,7 @@ public class CrossOperatorTest {
 			.projectSecond(2);
 	}
 
-	@Test(expected=IndexOutOfBoundsException.class)
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection31() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -386,7 +390,7 @@ public class CrossOperatorTest {
 		ds1.cross(ds2)
 			.projectSecond(-1);
 	}
-	
+
 	public void testCrossProjection12() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -398,8 +402,8 @@ public class CrossOperatorTest {
 			.projectSecond(2)
 			.projectFirst(1);
 	}
-	
-	@Test(expected=IndexOutOfBoundsException.class)
+
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection32() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -412,7 +416,7 @@ public class CrossOperatorTest {
 			.projectFirst(-1);
 	}
 
-	@Test(expected=IndexOutOfBoundsException.class)
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection13() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -425,7 +429,7 @@ public class CrossOperatorTest {
 			.projectFirst(5);
 	}
 
-	@Test(expected=IndexOutOfBoundsException.class)
+	@Test(expected = IndexOutOfBoundsException.class)
 	public void testCrossProjection14() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -437,12 +441,12 @@ public class CrossOperatorTest {
 			.projectFirst(0)
 			.projectSecond(5);
 	}
-	
+
 	/*
 	 * ####################################################################
 	 */
 
-	public static class CustomType implements Serializable {
+	private static class CustomType implements Serializable {
 
 		private static final long serialVersionUID = 1L;
 
@@ -450,7 +454,8 @@ public class CrossOperatorTest {
 		public long myLong;
 		public String myString;
 
-		public CustomType() {}
+		public CustomType() {
+		}
 
 		public CustomType(int i, long l, String s) {
 			myInt = i;
@@ -460,7 +465,7 @@ public class CrossOperatorTest {
 
 		@Override
 		public String toString() {
-			return myInt+","+myLong+","+myString;
+			return myInt + "," + myLong + "," + myString;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 0493583..ab8398f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.operator;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -25,6 +26,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,6 +35,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Tests for {@link DataSet#writeAsText(String)}.
+ */
 public class DataSinkTest {
 
 	// TUPLE DATA
@@ -48,7 +53,7 @@ public class DataSinkTest {
 
 	@Before
 	public void fillPojoData() {
-		if(pojoData.isEmpty()) {
+		if (pojoData.isEmpty()) {
 			pojoData.add(new CustomType());
 		}
 	}
@@ -186,7 +191,7 @@ public class DataSinkTest {
 		final ExecutionEnvironment env = ExecutionEnvironment
 				.getExecutionEnvironment();
 		DataSet<Long> longDs = env
-				.generateSequence(0,2);
+				.generateSequence(0, 2);
 
 		// should work
 		try {
@@ -203,7 +208,7 @@ public class DataSinkTest {
 		final ExecutionEnvironment env = ExecutionEnvironment
 				.getExecutionEnvironment();
 		DataSet<Long> longDs = env
-				.generateSequence(0,2);
+				.generateSequence(0, 2);
 
 		// must not work
 		longDs.writeAsText("/tmp/willNotHappen")
@@ -216,7 +221,7 @@ public class DataSinkTest {
 		final ExecutionEnvironment env = ExecutionEnvironment
 				.getExecutionEnvironment();
 		DataSet<Long> longDs = env
-				.generateSequence(0,2);
+				.generateSequence(0, 2);
 
 		// must not work
 		longDs.writeAsText("/tmp/willNotHappen")
@@ -229,7 +234,7 @@ public class DataSinkTest {
 		final ExecutionEnvironment env = ExecutionEnvironment
 				.getExecutionEnvironment();
 		DataSet<Long> longDs = env
-				.generateSequence(0,2);
+				.generateSequence(0, 2);
 
 		// must not work
 		longDs.writeAsText("/tmp/willNotHappen")

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index 0bbeeb2..b768389 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -18,27 +18,31 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.junit.Assert;
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#distinct()}.
+ */
 public class DistinctOperatorTest {
 
 	// TUPLE DATA
-	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -46,52 +50,52 @@ public class DistinctOperatorTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
 	// LONG DATA
 	private final List<Long> emptyLongData = new ArrayList<Long>();
-	
+
 	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
-	
-	@Test  
+
+	@Test
 	public void testDistinctByKeyFields1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.distinct(0);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
-	@Test(expected = InvalidProgramException.class)  
+
+	@Test(expected = InvalidProgramException.class)
 	public void testDistinctByKeyFields2() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
 		// should not work: distinct on basic type
 		longDs.distinct(0);
 	}
-	
-	@Test(expected = InvalidProgramException.class)  
+
+	@Test(expected = InvalidProgramException.class)
 	public void testDistinctByKeyFields3() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		this.customTypeData.add(new CustomType());
-		
+
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 		// should not work: distinct on custom type
 		customDs.distinct(0);
-		
+
 	}
-	
+
 	@Test
 	public void testDistinctByKeyFields4() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
@@ -101,17 +105,17 @@ public class DistinctOperatorTest {
 
 	@Test
 	public void testDistinctByKeyFields5() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		this.customTypeData.add(new CustomType());
-		
+
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 
 		// should work
 		customDs.distinct();
 	}
-	
+
 	@Test(expected = IndexOutOfBoundsException.class)
 	public void testDistinctByKeyFields6() {
 
@@ -134,29 +138,29 @@ public class DistinctOperatorTest {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	@SuppressWarnings("serial")
 	public void testDistinctByKeySelector1() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
+
 		try {
 			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 			// should work
 			customDs.distinct(
 					new KeySelector<DistinctOperatorTest.CustomType, Long>() {
-	
+
 						@Override
 						public Long getKey(CustomType value) {
 							return value.myLong;
 					}
 			});
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 	}
 
 	@Test
@@ -166,7 +170,7 @@ public class DistinctOperatorTest {
 			DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
 			// should work
 			longDs.distinct();
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -206,14 +210,17 @@ public class DistinctOperatorTest {
 		public List<Integer> myInts;
 	}
 
+	/**
+	 * Custom data type, for testing purposes.
+	 */
 	public static class CustomType implements Serializable {
-		
+
 		private static final long serialVersionUID = 1L;
-		
+
 		public int myInt;
 		public long myLong;
 		public String myString;
-		
+
 		public CustomType() {}
 
 		public CustomType(int i, long l, String s) {
@@ -221,11 +228,11 @@ public class DistinctOperatorTest {
 			myLong = l;
 			myString = s;
 		}
-		
+
 		@Override
 		public String toString() {
-			return myInt+","+myLong+","+myString;
+			return myInt + "," + myLong + "," + myString;
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
index aaf744c..b02b318 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -28,17 +25,24 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#first(int)}.
+ */
 public class FirstNOperatorTest {
 
 	// TUPLE DATA
-	
-	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -46,127 +50,127 @@ public class FirstNOperatorTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
 	@Test
 	public void testUngroupedFirstN() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.first(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should work
 		try {
 			tupleDs.first(10);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work n == 0
 		try {
 			tupleDs.first(0);
 			Assert.fail();
-		} catch(InvalidProgramException ipe) {
+		} catch (InvalidProgramException ipe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work n == -1
 		try {
 			tupleDs.first(-1);
 			Assert.fail();
-		} catch(InvalidProgramException ipe) {
+		} catch (InvalidProgramException ipe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 	}
-	
+
 	@Test
 	public void testGroupedFirstN() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.groupBy(2).first(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should work
 		try {
-			tupleDs.groupBy(1,3).first(10);
-		} catch(Exception e) {
+			tupleDs.groupBy(1, 3).first(10);
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work n == 0
 		try {
 			tupleDs.groupBy(0).first(0);
 			Assert.fail();
-		} catch(InvalidProgramException ipe) {
+		} catch (InvalidProgramException ipe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work n == -1
 		try {
 			tupleDs.groupBy(2).first(-1);
 			Assert.fail();
-		} catch(InvalidProgramException ipe) {
+		} catch (InvalidProgramException ipe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	public void testGroupedSortedFirstN() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should work
 		try {
-			tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
-		} catch(Exception e) {
+			tupleDs.groupBy(1, 3).sortGroup(4, Order.ASCENDING).first(10);
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work n == 0
 		try {
 			tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
 			Assert.fail();
-		} catch(InvalidProgramException ipe) {
+		} catch (InvalidProgramException ipe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
-		
+
 		// should not work n == -1
 		try {
 			tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
 			Assert.fail();
-		} catch(InvalidProgramException ipe) {
+		} catch (InvalidProgramException ipe) {
 			// we're good here
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
index 9f2aa41..d52f59f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -28,11 +28,15 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Tests for {@link DataSet#fullOuterJoin(DataSet)}.
+ */
 public class FullOuterJoinOperatorTest {
 
 	// TUPLE DATA
@@ -201,7 +205,6 @@ public class FullOuterJoinOperatorTest {
 		this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
 	}
 
-
 	private void testFullOuterStrategies(JoinHint hint) {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -214,13 +217,12 @@ public class FullOuterJoinOperatorTest {
 				.with(new DummyJoin());
 	}
 
-	
 	/*
 	 * ####################################################################
 	 */
 
 	@SuppressWarnings("serial")
-	public static class DummyJoin implements
+	private static class DummyJoin implements
 			JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
 
 		@Override
@@ -230,7 +232,7 @@ public class FullOuterJoinOperatorTest {
 	}
 
 	@SuppressWarnings("serial")
-	public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+	private static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
 
 		@Override
 		public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
@@ -239,7 +241,7 @@ public class FullOuterJoinOperatorTest {
 	}
 
 	@SuppressWarnings("serial")
-	public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+	private static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
 
 		@Override
 		public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
index 4870d29..336219b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -38,13 +39,16 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link DataSet#combineGroup(GroupCombineFunction)}.
+ */
 @SuppressWarnings("serial")
 public class GroupCombineOperatorTest {
 
 	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -52,14 +56,14 @@ public class GroupCombineOperatorTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
 	@Test
 	public void testSemanticPropsWithKeySelector1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.combineGroup(new DummyGroupCombineFunction1());
 
@@ -95,7 +99,7 @@ public class GroupCombineOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
 						.combineGroup(new DummyGroupCombineFunction1());
@@ -134,7 +138,7 @@ public class GroupCombineOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.combineGroup(new DummyGroupCombineFunction2())
 							.withForwardedFields("0->4;1;1->3;2");
@@ -171,7 +175,7 @@ public class GroupCombineOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
 						.combineGroup(new DummyGroupCombineFunction2())
@@ -211,7 +215,7 @@ public class GroupCombineOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.combineGroup(new DummyGroupCombineFunction3())
 						.withForwardedFields("4->0;3;3->1;2");
@@ -245,7 +249,7 @@ public class GroupCombineOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
 						.combineGroup(new DummyGroupCombineFunction3())
@@ -282,7 +286,7 @@ public class GroupCombineOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.combineGroup(new DummyGroupCombineFunction4());
 
@@ -308,7 +312,7 @@ public class GroupCombineOperatorTest {
 		assertTrue(semProps.getReadFields(0) == null);
 	}
 
-	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+	private static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
 		@Override
 		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
 			return new Tuple2<Long, Integer>();
@@ -317,27 +321,27 @@ public class GroupCombineOperatorTest {
 
 	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
 	@FunctionAnnotation.ReadFields("0;3;4")
-	public static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}
 	}
 
 	@FunctionAnnotation.ReadFields("0;3;4")
-	public static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}
 	}
 
-	public static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}
 	}
 
 	@FunctionAnnotation.NonForwardedFields("2;4")
-	public static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
index 0bfe566..62380eb 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -38,13 +39,16 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link DataSet#reduceGroup(GroupReduceFunction)}.
+ */
 @SuppressWarnings("serial")
 public class GroupReduceOperatorTest {
 
 	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -52,14 +56,14 @@ public class GroupReduceOperatorTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
 	@Test
 	public void testSemanticPropsWithKeySelector1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.reduceGroup(new DummyGroupReduceFunction1());
 
@@ -95,7 +99,7 @@ public class GroupReduceOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
 						.reduceGroup(new DummyGroupReduceFunction1());
@@ -134,7 +138,7 @@ public class GroupReduceOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.reduceGroup(new DummyGroupReduceFunction2())
 							.withForwardedFields("0->4;1;1->3;2");
@@ -171,7 +175,7 @@ public class GroupReduceOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
 						.reduceGroup(new DummyGroupReduceFunction2())
@@ -211,7 +215,7 @@ public class GroupReduceOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.reduceGroup(new DummyGroupReduceFunction3())
 						.withForwardedFields("4->0;3;3->1;2");
@@ -245,7 +249,7 @@ public class GroupReduceOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
 						.reduceGroup(new DummyGroupReduceFunction3())
@@ -282,7 +286,7 @@ public class GroupReduceOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
 				tupleDs.groupBy(new DummyTestKeySelector())
 						.reduceGroup(new DummyGroupReduceFunction4());
 
@@ -308,7 +312,7 @@ public class GroupReduceOperatorTest {
 		assertTrue(semProps.getReadFields(0) == null);
 	}
 
-	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+	private static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
 		@Override
 		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
 			return new Tuple2<Long, Integer>();
@@ -317,27 +321,27 @@ public class GroupReduceOperatorTest {
 
 	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
 	@FunctionAnnotation.ReadFields("0;3;4")
-	public static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}
 	}
 
 	@FunctionAnnotation.ReadFields("0;3;4")
-	public static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}
 	}
 
-	public static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}
 	}
 
 	@FunctionAnnotation.NonForwardedFields("2;4")
-	public static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+	private static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
 		@Override
 		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 18b17b5..0117cf1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -34,16 +30,24 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#groupBy(int...)}.
+ */
 public class GroupingTest {
 
 	// TUPLE DATA
-	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
 			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
@@ -62,64 +66,64 @@ public class GroupingTest {
 
 	// LONG DATA
 	private final List<Long> emptyLongData = new ArrayList<Long>();
-	
+
 	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
 
 	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
 			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
-	
+
 	private final List<Tuple2<byte[], byte[]>> byteArrayData = new ArrayList<Tuple2<byte[], byte[]>>();
 
-	@Test  
+	@Test
 	public void testGroupByKeyFields1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.groupBy(0);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
-	@Test(expected = InvalidProgramException.class)  
+
+	@Test(expected = InvalidProgramException.class)
 	public void testGroupByKeyFields2() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
 		// should not work: groups on basic type
 		longDs.groupBy(0);
 	}
-	
-	@Test(expected = InvalidProgramException.class)  
+
+	@Test(expected = InvalidProgramException.class)
 	public void testGroupByKeyFields3() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		this.customTypeData.add(new CustomType());
-		
+
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 		// should not work: groups on custom type
 		customDs.groupBy(0);
-		
+
 	}
-	
+
 	@Test(expected = IndexOutOfBoundsException.class)
 	public void testGroupByKeyFields4() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should not work, key out of tuple bounds
 		tupleDs.groupBy(5);
 	}
-	
+
 	@Test(expected = IndexOutOfBoundsException.class)
 	public void testGroupByKeyFields5() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
@@ -148,7 +152,7 @@ public class GroupingTest {
 		// should work
 		try {
 			ds.groupBy("myInt");
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -198,7 +202,7 @@ public class GroupingTest {
 		// should work
 		try {
 			ds.groupBy("nested.myInt");
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -212,48 +216,48 @@ public class GroupingTest {
 		// should not work, key out of tuple bounds
 		ds.groupBy("nested.myNonExistent");
 	}
-	
+
 	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
+
 		try {
 			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 			// should work
 			customDs.groupBy(
 					new KeySelector<GroupingTest.CustomType, Long>() {
-	
+
 						@Override
 						public Long getKey(CustomType value) {
 							return value.myLong;
 					}
 			});
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector2() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
+
 		try {
 			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 			// should work
 			customDs.groupBy(
 					new KeySelector<GroupingTest.CustomType, Tuple2<Integer, Long>>() {
 						@Override
-						public Tuple2<Integer,Long> getKey(CustomType value) {
+						public Tuple2<Integer, Long> getKey(CustomType value) {
 							return new Tuple2<Integer, Long>(value.myInt, value.myLong);
 					}
 			});
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -261,7 +265,7 @@ public class GroupingTest {
 	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector3() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
 
@@ -275,7 +279,7 @@ public class GroupingTest {
 							return value;
 						}
 					});
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -297,7 +301,7 @@ public class GroupingTest {
 							return new Tuple2<Integer, CustomType>(value.myInt, value);
 						}
 					});
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -319,38 +323,38 @@ public class GroupingTest {
 					}
 				});
 	}
-	
+
 	@Test
 	public void testGroupSortKeyFields1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
-	
+
 	@Test(expected = IndexOutOfBoundsException.class)
 	public void testGroupSortKeyFields2() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should not work, field index out of bounds
 		tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING);
-		
+
 	}
-	
+
 	@Test(expected = InvalidProgramException.class)
 	public void testGroupSortKeyFields3() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
-		
+
 		// should not work: sorted groups on groupings by key selectors
 		longDs.groupBy(new KeySelector<Long, Long>() {
 			private static final long serialVersionUID = 1L;
@@ -359,9 +363,9 @@ public class GroupingTest {
 			public Long getKey(Long value) {
 				return value;
 			}
-			
+
 		}).sortGroup(0, Order.ASCENDING);
-		
+
 	}
 
 	@Test(expected = InvalidProgramException.class)
@@ -387,17 +391,17 @@ public class GroupingTest {
 		tupleDs.groupBy(0)
 				.sortGroup(3, Order.ASCENDING);
 	}
-	
+
 	@Test
 	public void testChainedGroupSortKeyFields() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should work
 		try {
 			tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -412,7 +416,7 @@ public class GroupingTest {
 		// should work
 		try {
 			tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -427,7 +431,7 @@ public class GroupingTest {
 		// should work
 		try {
 			tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -444,7 +448,7 @@ public class GroupingTest {
 			tupleDs.groupBy("f0")
 					.sortGroup("f2.myString", Order.ASCENDING)
 					.sortGroup("f1", Order.DESCENDING);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			Assert.fail();
 		}
 	}
@@ -496,7 +500,7 @@ public class GroupingTest {
 
 		// should not work
 		tupleDs.groupBy(
-				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+				new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long>() {
 					@Override
 					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
 						return value.f1;
@@ -521,7 +525,7 @@ public class GroupingTest {
 
 		// should not work
 		tupleDs.groupBy(
-				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+				new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long>() {
 					@Override
 					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
 						return value.f1;
@@ -546,7 +550,7 @@ public class GroupingTest {
 
 		// should not work
 		tupleDs.groupBy(
-				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+				new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long>() {
 					@Override
 					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
 						return value.f1;
@@ -593,19 +597,25 @@ public class GroupingTest {
 		dataSet.groupBy("*");
 	}
 
-
+	/**
+	 * Custom data type, for testing purposes.
+	 */
 	public static class CustomType implements Serializable {
-		
+
+		/**
+		 * Custom nested data type, for testing purposes.
+		 */
 		public static class Nest {
 			public int myInt;
 		}
+
 		private static final long serialVersionUID = 1L;
-		
+
 		public int myInt;
 		public long myLong;
 		public String myString;
 		public Nest nested;
-		
+
 		public CustomType() {}
 
 		public CustomType(int i, long l, String s) {
@@ -613,13 +623,16 @@ public class GroupingTest {
 			myLong = l;
 			myString = s;
 		}
-		
+
 		@Override
 		public String toString() {
-			return myInt+","+myLong+","+myString;
+			return myInt + "," + myLong + "," + myString;
 		}
 	}
 
+	/**
+	 * Custom non-nested data type, for testing purposes.
+	 */
 	public static class CustomType2 implements Serializable {
 
 		public int myInt;