You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:17 UTC

[17/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index 6350533..add0cd1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.List;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.java.DataSet;
@@ -27,6 +26,11 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
+import java.util.List;
+
+/**
+ * Test iteration with termination criterion.
+ */
 public class IterationTerminationWithTerminationTail extends JavaProgramTestBase {
 	private static final String EXPECTED = "22\n";
 
@@ -48,7 +52,7 @@ public class IterationTerminationWithTerminationTail extends JavaProgramTestBase
 		containsResultAsText(result, EXPECTED);
 	}
 
-	public static final class SumReducer implements GroupReduceFunction<String, String> {
+	private static final class SumReducer implements GroupReduceFunction<String, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -61,7 +65,7 @@ public class IterationTerminationWithTerminationTail extends JavaProgramTestBase
 		}
 	}
 
-	public static class TerminationFilter implements FilterFunction<String> {
+	private static class TerminationFilter implements FilterFunction<String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index 5a2df3f..4f749b6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -18,16 +18,19 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.List;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import static org.apache.flink.test.util.TestBaseUtils.containsResultAsText;
 import org.apache.flink.util.Collector;
 
+import java.util.List;
+
+/**
+ * Test iteration with termination criterion consuming the iteration tail.
+ */
 public class IterationTerminationWithTwoTails extends JavaProgramTestBase {
 	private static final String EXPECTED = "22\n";
 
@@ -49,7 +52,7 @@ public class IterationTerminationWithTwoTails extends JavaProgramTestBase {
 		containsResultAsText(result, EXPECTED);
 	}
 
-	public static final class SumReducer implements GroupReduceFunction<String, String> {
+	private static final class SumReducer implements GroupReduceFunction<String, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -62,7 +65,7 @@ public class IterationTerminationWithTwoTails extends JavaProgramTestBase {
 		}
 	}
 
-	public static class TerminationFilter implements FilterFunction<String> {
+	private static class TerminationFilter implements FilterFunction<String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index ab66f31..3228b73 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.List;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.List;
+
+/**
+ * Test iterator with an all-reduce.
+ */
 public class IterationWithAllReducerITCase extends JavaProgramTestBase {
 	private static final String EXPECTED = "1\n";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index c283df1..2ba1a8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.PointFormatter;
-import org.apache.flink.test.util.PointInFormat;
 import org.apache.flink.test.util.CoordVector;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
 import org.apache.flink.util.Collector;
 
+/**
+ * Test iteration with operator chaining.
+ */
 public class IterationWithChainingITCase extends JavaProgramTestBase {
 
 	private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
@@ -48,13 +51,13 @@ public class IterationWithChainingITCase extends JavaProgramTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(4);
 
-		DataSet<Tuple2<Integer, CoordVector>> initialInput
-				= env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input");
+		DataSet<Tuple2<Integer, CoordVector>> initialInput =
+				env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input");
 
 		IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2).name("Loop");
 
-		DataSet<Tuple2<Integer, CoordVector>> identity
-				= iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
+		DataSet<Tuple2<Integer, CoordVector>> identity =
+				iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
 					@Override
 					public void reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, CoordVector>> out) throws Exception {
 						for (Tuple2<Integer, CoordVector> value : values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index 8756429..e44d870 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -27,11 +25,16 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.test.util.PointFormatter;
 import org.apache.flink.test.util.PointInFormat;
-import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
+/**
+ * Test iteration with union.
+ */
 public class IterationWithUnionITCase extends JavaProgramTestBase {
 
 	private static final String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
@@ -39,13 +42,12 @@ public class IterationWithUnionITCase extends JavaProgramTestBase {
 	protected String dataPath;
 	protected String resultPath;
 
-
 	@Override
 	protected void preSubmit() throws Exception {
 		dataPath = createTempFile("datapoints.txt", DATAPOINTS);
 		resultPath = getTempDirPath("union_iter_result");
 	}
-	
+
 	@Override
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(DATAPOINTS + DATAPOINTS + DATAPOINTS + DATAPOINTS, resultPath);
@@ -54,18 +56,18 @@ public class IterationWithUnionITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		DataSet<Tuple2<Integer, CoordVector>> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
-		
+
 		IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2);
-		
+
 		DataSet<Tuple2<Integer, CoordVector>> result = iteration.union(iteration).map(new IdentityMapper());
-		
+
 		iteration.closeWith(result).writeAsFormattedText(this.resultPath, new PointFormatter());
-		
+
 		env.execute();
 	}
-	
+
 	static final class IdentityMapper implements MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
index 7bd9934..77e2663 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.examples.java.clustering.KMeans;
-import org.apache.flink.examples.java.clustering.KMeans.Point;
 import org.apache.flink.examples.java.clustering.KMeans.Centroid;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.examples.java.clustering.KMeans.Point;
 import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.util.List;
 import java.util.Locale;
 
+/**
+ * Test KMeans clustering with a broadcast set.
+ */
 public class KMeansWithBroadcastSetITCase extends JavaProgramTestBase {
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
index e6e91f6..cc3fdc6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
@@ -18,41 +18,44 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.optimizer.iterations.MultipleJoinsWithSolutionSetCompilerTest;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
 
+/**
+ * Test multiple joins with the solution set.
+ */
 public class MultipleSolutionSetJoinsITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		
-		final int NUM_ITERS = 4;
-		final double expectedFactor = (int) Math.pow(7, NUM_ITERS);
-		
+
+		final int numIters = 4;
+		final double expectedFactor = (int) Math.pow(7, numIters);
+
 		// this is an artificial program, it does not compute anything sensical
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple2<Long, Double>> initialData = env.fromElements(new Tuple2<Long, Double>(1L, 1.0), new Tuple2<Long, Double>(2L, 2.0),
 															new Tuple2<Long, Double>(3L, 3.0), new Tuple2<Long, Double>(4L, 4.0),
 															new Tuple2<Long, Double>(5L, 5.0), new Tuple2<Long, Double>(6L, 6.0));
-		
-		DataSet<Tuple2<Long, Double>> result = MultipleJoinsWithSolutionSetCompilerTest.constructPlan(initialData, NUM_ITERS);
-		
-		List<Tuple2<Long, Double>> resultCollector = new ArrayList<Tuple2<Long,Double>>();
-		result.output(new LocalCollectionOutputFormat<Tuple2<Long,Double>>(resultCollector));
-		
+
+		DataSet<Tuple2<Long, Double>> result = MultipleJoinsWithSolutionSetCompilerTest.constructPlan(initialData, numIters);
+
+		List<Tuple2<Long, Double>> resultCollector = new ArrayList<Tuple2<Long, Double>>();
+		result.output(new LocalCollectionOutputFormat<>(resultCollector));
+
 		env.execute();
-		
+
 		for (Tuple2<Long, Double> tuple : resultCollector) {
 			Assert.assertEquals(expectedFactor * tuple.f0, tuple.f1.doubleValue(), 0.0);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
index c987dfd..a402747 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -37,6 +38,9 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Test for duplicate elimination in the solution set.
+ */
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
@@ -66,7 +70,7 @@ public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
 
 			List<Integer> result = iter
 					.closeWith(iter.getWorkset(), iter.getWorkset())
-					.map(new MapFunction<Tuple2<Long,Long>, Integer>() {
+					.map(new MapFunction<Tuple2<Long, Long>, Integer>() {
 						@Override
 						public Integer map(Tuple2<Long, Long> value) {
 							return value.f0.intValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
index 766a422..0228aef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
@@ -27,34 +27,35 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * Test iterations referenced from the static path of other iterations.
+ */
 public class StaticlyNestedIterationsITCase extends JavaProgramTestBase {
 
-	
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		DataSet<Long> data1 = env.generateSequence(1, 100);
 		DataSet<Long> data2 = env.generateSequence(1, 100);
-		
+
 		IterativeDataSet<Long> firstIteration = data1.iterate(100);
-		
+
 		DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdMapper()));
-		
-		
+
 		IterativeDataSet<Long> mainIteration = data2.map(new IdMapper()).iterate(100);
-		
+
 		DataSet<Long> joined = mainIteration.join(firstResult)
 				.where(new IdKeyExtractor()).equalTo(new IdKeyExtractor())
 				.with(new Joiner());
-		
+
 		DataSet<Long> mainResult = mainIteration.closeWith(joined);
-		
+
 		mainResult.output(new DiscardingOutputFormat<Long>());
-		
+
 		env.execute();
 	}
-	
+
 	private static class IdKeyExtractor implements KeySelector<Long, Long> {
 
 		private static final long serialVersionUID = 1L;
@@ -64,21 +65,21 @@ public class StaticlyNestedIterationsITCase extends JavaProgramTestBase {
 			return value;
 		}
 	}
-	
+
 	private static class IdMapper implements MapFunction<Long, Long> {
-		
+
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Long map(Long value) {
 			return value;
 		}
 	}
-	
+
 	private static class Joiner implements JoinFunction<Long, Long, Long> {
-		
+
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Long join(Long first, Long second) {
 			return first;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
index fa8643f..01e578c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
@@ -18,36 +18,39 @@
 
 package org.apache.flink.test.iterative;
 
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test union between static and dynamic path in an iteration.
+ */
 public class UnionStaticDynamicIterationITCase  extends JavaProgramTestBase {
-	
+
 	private final ArrayList<Long> result = new ArrayList<Long>();
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		DataSet<Long> inputStatic = env.generateSequence(1, 4);
 		DataSet<Long> inputIteration = env.generateSequence(1, 4);
-		
+
 		IterativeDataSet<Long> iteration = inputIteration.iterate(3);
-		
+
 		DataSet<Long> result = iteration.closeWith(inputStatic.union(inputStatic).union(iteration.union(iteration)));
-			
+
 		result.output(new LocalCollectionOutputFormat<Long>(this.result));
-		
+
 		env.execute();
 	}
-	
+
 	@Override
 	protected void postSubmit() throws Exception {
 		assertEquals(88, result.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
index 3bced25..2403cd9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
@@ -18,33 +18,33 @@
 
 package org.apache.flink.test.iterative.aggregators;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.junit.Assert.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
 
 /**
- * Connected Components test case that uses a parameterizable convergence criterion
+ * Connected Components test case that uses a parameterizable convergence criterion.
  */
 @RunWith(Parameterized.class)
 @SuppressWarnings("serial")
@@ -55,181 +55,181 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
 	}
 
 	final List<Tuple2<Long, Long>> verticesInput = Arrays.asList(
-			new Tuple2<>(1l,1l),
-			new Tuple2<>(2l,2l),
-			new Tuple2<>(3l,3l),
-			new Tuple2<>(4l,4l),
-			new Tuple2<>(5l,5l),
-			new Tuple2<>(6l,6l),
-			new Tuple2<>(7l,7l),
-			new Tuple2<>(8l,8l),
-			new Tuple2<>(9l,9l)
+		new Tuple2<>(1L, 1L),
+		new Tuple2<>(2L, 2L),
+		new Tuple2<>(3L, 3L),
+		new Tuple2<>(4L, 4L),
+		new Tuple2<>(5L, 5L),
+		new Tuple2<>(6L, 6L),
+		new Tuple2<>(7L, 7L),
+		new Tuple2<>(8L, 8L),
+		new Tuple2<>(9L, 9L)
 	);
 
 	final List<Tuple2<Long, Long>> edgesInput = Arrays.asList(
-			new Tuple2<>(1l,2l),
-			new Tuple2<>(1l,3l),
-			new Tuple2<>(2l,3l),
-			new Tuple2<>(2l,4l),
-			new Tuple2<>(2l,1l),
-			new Tuple2<>(3l,1l),
-			new Tuple2<>(3l,2l),
-			new Tuple2<>(4l,2l),
-			new Tuple2<>(4l,6l),
-			new Tuple2<>(5l,6l),
-			new Tuple2<>(6l,4l),
-			new Tuple2<>(6l,5l),
-			new Tuple2<>(7l,8l),
-			new Tuple2<>(7l,9l),
-			new Tuple2<>(8l,7l),
-			new Tuple2<>(8l,9l),
-			new Tuple2<>(9l,7l),
-			new Tuple2<>(9l,8l)
+		new Tuple2<>(1L, 2L),
+		new Tuple2<>(1L, 3L),
+		new Tuple2<>(2L, 3L),
+		new Tuple2<>(2L, 4L),
+		new Tuple2<>(2L, 1L),
+		new Tuple2<>(3L, 1L),
+		new Tuple2<>(3L, 2L),
+		new Tuple2<>(4L, 2L),
+		new Tuple2<>(4L, 6L),
+		new Tuple2<>(5L, 6L),
+		new Tuple2<>(6L, 4L),
+		new Tuple2<>(6L, 5L),
+		new Tuple2<>(7L, 8L),
+		new Tuple2<>(7L, 9L),
+		new Tuple2<>(8L, 7L),
+		new Tuple2<>(8L, 9L),
+		new Tuple2<>(9L, 7L),
+		new Tuple2<>(9L, 8L)
 	);
 
 	final List<Tuple2<Long, Long>> expectedResult = Arrays.asList(
-			new Tuple2<>(1L,1L),
-			new Tuple2<>(2L,1L),
-			new Tuple2<>(3L,1L),
-			new Tuple2<>(4L,1L),
-			new Tuple2<>(5L,2L),
-			new Tuple2<>(6L,1L),
-			new Tuple2<>(7L,7L),
-			new Tuple2<>(8L,7L),
-			new Tuple2<>(9L,7L)
+		new Tuple2<>(1L, 1L),
+		new Tuple2<>(2L, 1L),
+		new Tuple2<>(3L, 1L),
+		new Tuple2<>(4L, 1L),
+		new Tuple2<>(5L, 2L),
+		new Tuple2<>(6L, 1L),
+		new Tuple2<>(7L, 7L),
+		new Tuple2<>(8L, 7L),
+		new Tuple2<>(9L, 7L)
 	);
 
 	@Test
 	public void testConnectedComponentsWithParametrizableConvergence() throws Exception {
 
-			// name of the aggregator that checks for convergence
-			final String UPDATED_ELEMENTS = "updated.elements.aggr";
+		// name of the aggregator that checks for convergence
+		final String updatedElements = "updated.elements.aggr";
+
+		// the iteration stops if less than this number of elements change value
+		final long convergenceThreshold = 3;
 
-			// the iteration stops if less than this number of elements change value
-			final long convergence_threshold = 3;
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
+		DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
 
-			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
-			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
+		IterativeDataSet<Tuple2<Long, Long>> iteration = initialSolutionSet.iterate(10);
 
-			IterativeDataSet<Tuple2<Long, Long>> iteration = initialSolutionSet.iterate(10);
+		// register the convergence criterion
+		iteration.registerAggregationConvergenceCriterion(updatedElements,
+			new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergenceThreshold));
 
-			// register the convergence criterion
-			iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
-					new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergence_threshold));
+		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
+			.with(new NeighborWithComponentIDJoin())
+			.groupBy(0).min(1);
 
-			DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
-					.with(new NeighborWithComponentIDJoin())
-					.groupBy(0).min(1);
+		DataSet<Tuple2<Long, Long>> updatedComponentId =
+			verticesWithNewComponents.join(iteration).where(0).equalTo(0)
+				.flatMap(new MinimumIdFilter(updatedElements));
 
-			DataSet<Tuple2<Long, Long>> updatedComponentId =
-					verticesWithNewComponents.join(iteration).where(0).equalTo(0)
-							.flatMap(new MinimumIdFilter(UPDATED_ELEMENTS));
+		List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
+		Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
 
-			List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
-			Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
-			
-			assertEquals(expectedResult, result);
+		assertEquals(expectedResult, result);
 	}
 
 	@Test
 	public void testDeltaConnectedComponentsWithParametrizableConvergence() throws Exception {
 
-			// name of the aggregator that checks for convergence
-			final String UPDATED_ELEMENTS = "updated.elements.aggr";
+		// name of the aggregator that checks for convergence
+		final String updatedElements = "updated.elements.aggr";
 
-			// the iteration stops if less than this number of elements change value
-			final long convergence_threshold = 3;
+		// the iteration stops if less than this number of elements change value
+		final long convergenceThreshold = 3;
 
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
-			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
+		DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
+		DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
 
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
-					initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+			initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
 
-			// register the convergence criterion
-			iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
-					new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergence_threshold));
+		// register the convergence criterion
+		iteration.registerAggregationConvergenceCriterion(updatedElements,
+			new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergenceThreshold));
 
-			DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.getWorkset().join(edges).where(0).equalTo(0)
-					.with(new NeighborWithComponentIDJoin())
-					.groupBy(0).min(1);
+		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.getWorkset().join(edges).where(0).equalTo(0)
+			.with(new NeighborWithComponentIDJoin())
+			.groupBy(0).min(1);
 
-			DataSet<Tuple2<Long, Long>> updatedComponentId =
-					verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
-							.flatMap(new MinimumIdFilter(UPDATED_ELEMENTS));
+		DataSet<Tuple2<Long, Long>> updatedComponentId =
+			verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
+				.flatMap(new MinimumIdFilter(updatedElements));
 
-			List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId, updatedComponentId).collect();
-			Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
+		List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId, updatedComponentId).collect();
+		Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
 
-			assertEquals(expectedResult, result);
+		assertEquals(expectedResult, result);
 	}
-	
+
 	@Test
 	public void testParameterizableAggregator() throws Exception {
 
-			final int MAX_ITERATIONS = 5;
-			final String AGGREGATOR_NAME = "elements.in.component.aggregator";
-			final long componentId = 1l;
+			final int maxIterations = 5;
+		final String aggregatorName = "elements.in.component.aggregator";
+		final long componentId = 1L;
 
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
-			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
+		DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
+		DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
 
-			IterativeDataSet<Tuple2<Long, Long>> iteration =
-					initialSolutionSet.iterate(MAX_ITERATIONS);
+		IterativeDataSet<Tuple2<Long, Long>> iteration =
+			initialSolutionSet.iterate(maxIterations);
 
-			// register the aggregator
-			iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregatorWithParameter(componentId));
+		// register the aggregator
+		iteration.registerAggregator(aggregatorName, new LongSumAggregatorWithParameter(componentId));
 
-			DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
-					.with(new NeighborWithComponentIDJoin())
-					.groupBy(0).min(1);
+		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
+			.with(new NeighborWithComponentIDJoin())
+			.groupBy(0).min(1);
 
-			DataSet<Tuple2<Long, Long>> updatedComponentId =
-					verticesWithNewComponents.join(iteration).where(0).equalTo(0)
-							.flatMap(new MinimumIdFilterCounting(AGGREGATOR_NAME));
+		DataSet<Tuple2<Long, Long>> updatedComponentId =
+			verticesWithNewComponents.join(iteration).where(0).equalTo(0)
+				.flatMap(new MinimumIdFilterCounting(aggregatorName));
 
-			List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
+		List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
 
-			Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
+		Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
 
-			List<Tuple2<Long, Long>> expectedResult = Arrays.asList(
-					new Tuple2<>(1L,1L),
-					new Tuple2<>(2L,1L),
-					new Tuple2<>(3L,1L),
-					new Tuple2<>(4L,1L),
-					new Tuple2<>(5L,1L),
-					new Tuple2<>(6L,1L),
-					new Tuple2<>(7L,7L),
-					new Tuple2<>(8L,7L),
-					new Tuple2<>(9L,7L)
-			);
+		List<Tuple2<Long, Long>> expectedResult = Arrays.asList(
+			new Tuple2<>(1L, 1L),
+			new Tuple2<>(2L, 1L),
+			new Tuple2<>(3L, 1L),
+			new Tuple2<>(4L, 1L),
+			new Tuple2<>(5L, 1L),
+			new Tuple2<>(6L, 1L),
+			new Tuple2<>(7L, 7L),
+			new Tuple2<>(8L, 7L),
+			new Tuple2<>(9L, 7L)
+		);
 
-			// check program result
-			assertEquals(expectedResult, result);
+		// check program result
+		assertEquals(expectedResult, result);
 
-			// check aggregators
-			long[] aggr_values = MinimumIdFilterCounting.aggr_value;
+		// check aggregators
+		long[] aggrValues = MinimumIdFilterCounting.aggr_value;
 
-			// note that position 0 has the end result from superstep 1, retrieved at the start of iteration 2
-			// position one as superstep 2, retrieved at the start of iteration 3.
-			// the result from iteration 5 is not available, because no iteration 6 happens
-			assertEquals(3, aggr_values[0]);
-			assertEquals(4, aggr_values[1]);
-			assertEquals(5, aggr_values[2]);
-			assertEquals(6, aggr_values[3]);
+		// note that position 0 has the end result from superstep 1, retrieved at the start of iteration 2
+		// position one as superstep 2, retrieved at the start of iteration 3.
+		// the result from iteration 5 is not available, because no iteration 6 happens
+		assertEquals(3, aggrValues[0]);
+		assertEquals(4, aggrValues[1]);
+		assertEquals(5, aggrValues[2]);
+		assertEquals(6, aggrValues[3]);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Test Functions
 	// ------------------------------------------------------------------------
 
-	public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	private static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -239,8 +239,8 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
 			return vertexWithCompId;
 		}
 	}
-	
-	public static class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+
+	private static class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
 
 		private final String aggName;
 		private LongSumAggregator aggr;
@@ -261,7 +261,7 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
 
 			if (vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1) {
 				out.collect(vertexWithNewAndOldId.f0);
-				aggr.aggregate(1l);
+				aggr.aggregate(1L);
 			}
 			else {
 				out.collect(vertexWithNewAndOldId.f1);
@@ -269,24 +269,24 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
 		}
 	}
 
-	public static final class MinimumIdFilterCounting 
+	private static final class MinimumIdFilterCounting
 			extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-		
+
 		private static final long[] aggr_value = new long[5];
-		
+
 		private final String aggName;
 		private LongSumAggregatorWithParameter aggr;
 
 		public MinimumIdFilterCounting(String aggName) {
 			this.aggName = aggName;
 		}
-		
+
 		@Override
 		public void open(Configuration conf) {
 			final int superstep = getIterationRuntimeContext().getSuperstepNumber();
-			
+
 			aggr = getIterationRuntimeContext().getIterationAggregator(aggName);
-			
+
 			if (superstep > 1 && getIterationRuntimeContext().getIndexOfThisSubtask() == 0) {
 				LongValue val = getIterationRuntimeContext().getPreviousIterationAggregate(aggName);
 				aggr_value[superstep - 2] = val.getValue();
@@ -301,24 +301,26 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
 			if (vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1) {
 				out.collect(vertexWithNewAndOldId.f0);
 				if (vertexWithNewAndOldId.f0.f1 == aggr.getComponentId()) {
-					aggr.aggregate(1l);
+					aggr.aggregate(1L);
 				}
 			} else {
 				out.collect(vertexWithNewAndOldId.f1);
 				if (vertexWithNewAndOldId.f1.f1 == aggr.getComponentId()) {
-					aggr.aggregate(1l);
+					aggr.aggregate(1L);
 				}
 			}
 		}
 	}
 
-	/** A Convergence Criterion with one parameter */
-	public static class UpdatedElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
+	/**
+	 * A Convergence Criterion with one parameter.
+	 */
+	private static class UpdatedElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
 
 		private final long threshold;
 
-		public UpdatedElementsConvergenceCriterion(long u_threshold) {
-			this.threshold = u_threshold;
+		public UpdatedElementsConvergenceCriterion(long uThreshold) {
+			this.threshold = uThreshold;
 		}
 
 		@Override
@@ -327,7 +329,7 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
 		}
 	}
 
-	public static final class LongSumAggregatorWithParameter extends LongSumAggregator {
+	private static final class LongSumAggregatorWithParameter extends LongSumAggregator {
 
 		private long componentId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 042617d..64ee98a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -18,38 +18,38 @@
 
 package org.apache.flink.test.iterative.aggregators;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Assert;
-
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.util.Random;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -99,7 +99,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		env.registerCachedFile(resultPath, testName);
 
 		IterativeDataSet<Long> solution = env.fromElements(1L).iterate(2);
-		solution.closeWith(env.generateSequence(1,2).filter(new RichFilterFunction<Long>() {
+		solution.closeWith(env.generateSequence(1, 2).filter(new RichFilterFunction<Long>() {
 			@Override
 			public void open(Configuration parameters) throws Exception{
 				File file = getRuntimeContext().getDistributedCache().getFile(testName);
@@ -108,6 +108,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				reader.close();
 				assertEquals(output, testString);
 			}
+
 			@Override
 			public boolean filter(Long value) throws Exception {
 				return false;
@@ -311,7 +312,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class NegativeElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
+	private static final class NegativeElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
 
 		@Override
 		public boolean isConverged(int iteration, LongValue value) {
@@ -320,7 +321,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class NegativeElementsConvergenceCriterionWithParam implements ConvergenceCriterion<LongValue> {
+	private static final class NegativeElementsConvergenceCriterionWithParam implements ConvergenceCriterion<LongValue> {
 
 		private int value;
 
@@ -339,7 +340,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
+	private static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
 
 		private LongSumAggregator aggr;
 
@@ -354,14 +355,14 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 			Integer newValue = value - 1;
 			// count negative numbers
 			if (newValue < 0) {
-				aggr.aggregate(1l);
+				aggr.aggregate(1L);
 			}
 			return newValue;
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
+	private static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
 
 		private LongSumAggregatorWithParameter aggr;
 
@@ -374,15 +375,15 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		public Integer map(Integer value) {
 			Integer newValue = value - 1;
 			// count numbers less than the aggregator parameter
-			if ( newValue < aggr.getValue() ) {
-				aggr.aggregate(1l);
+			if (newValue < aggr.getValue()) {
+				aggr.aggregate(1L);
 			}
 			return newValue;
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static class LongSumAggregatorWithParameter extends LongSumAggregator {
+	private static class LongSumAggregatorWithParameter extends LongSumAggregator {
 
 		private int value;
 
@@ -396,7 +397,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
+	private static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
 
 		private Random rnd;
 
@@ -414,7 +415,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+	private static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
 
 		private LongSumAggregator aggr;
 		private LongValue previousAggr;
@@ -437,14 +438,14 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
 			// count the elements that are equal to the superstep number
 			if (value.f1 == superstep) {
-				aggr.aggregate(1l);
+				aggr.aggregate(1L);
 			}
 			return value;
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
+	private static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
 			Tuple2<Integer, Integer>> {
 
 		private int superstep;
@@ -465,7 +466,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
+	private static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
 
 		@Override
 		public Integer map(Tuple2<Integer, Integer> value) {
@@ -474,7 +475,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AggregateAndSubtractOneDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+	private static final class AggregateAndSubtractOneDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
 
 		private LongSumAggregator aggr;
 		private LongValue previousAggr;
@@ -497,7 +498,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
 			// count the ones
 			if (value.f1 == 1) {
-				aggr.aggregate(1l);
+				aggr.aggregate(1L);
 			}
 			value.f1--;
 			return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
deleted file mode 100644
index fc01ce7..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class AggregateITCase extends MultipleProgramsTestBase {
-
-
-	public AggregateITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testFullAggregate() throws Exception {
-		/*
-		 * Full Aggregate
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple2<Integer, Long>> aggregateDs = ds
-				.aggregate(Aggregations.SUM, 0)
-				.and(Aggregations.MAX, 1)
-				.project(0, 1);
-
-		List<Tuple2<Integer, Long>> result = aggregateDs.collect();
-
-		String expected = "231,6\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testFullAggregateOfMutableValueTypes() throws Exception {
-		/*
-		 * Full Aggregate of mutable value types
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
-				.aggregate(Aggregations.SUM, 0)
-				.and(Aggregations.MAX, 1)
-				.project(0, 1);
-
-		List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
-
-		String expected = "231,6\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testGroupedAggregate() throws Exception {
-		/*
-		 * Grouped Aggregate
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
-				.aggregate(Aggregations.SUM, 0)
-				.project(1, 0);
-
-		List<Tuple2<Long, Integer>> result = aggregateDs.collect();
-
-		String expected = "1,1\n" +
-				"2,5\n" +
-				"3,15\n" +
-				"4,34\n" +
-				"5,65\n" +
-				"6,111\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testGroupedAggregateOfMutableValueTypes() throws Exception {
-		/*
-		 * Grouped Aggregate of mutable value types
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
-				.aggregate(Aggregations.SUM, 0)
-				.project(1, 0);
-
-		List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
-
-		String expected = "1,1\n" +
-				"2,5\n" +
-				"3,15\n" +
-				"4,34\n" +
-				"5,65\n" +
-				"6,111\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testNestedAggregate() throws Exception {
-		/*
-		 * Nested Aggregate
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
-				.aggregate(Aggregations.MIN, 0)
-				.aggregate(Aggregations.MIN, 0)
-				.project(0);
-
-		List<Tuple1<Integer>> result = aggregateDs.collect();
-
-		String expected = "1\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testNestedAggregateOfMutableValueTypes() throws Exception {
-		/*
-		 * Nested Aggregate of mutable value types
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
-				.aggregate(Aggregations.MIN, 0)
-				.aggregate(Aggregations.MIN, 0)
-				.project(0);
-
-		List<Tuple1<IntValue>> result = aggregateDs.collect();
-
-		String expected = "1\n";
-
-		compareResultAsTuples(result, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java
deleted file mode 100644
index 35d615b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupGroupSortITCase extends JavaProgramTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		
-		DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
-				new Tuple2<Long, Long>(0L, 5L),
-				new Tuple2<Long, Long>(0L, 4L),
-				new Tuple2<Long, Long>(0L, 3L),
-				new Tuple2<Long, Long>(0L, 2L),
-				new Tuple2<Long, Long>(0L, 1L),
-				new Tuple2<Long, Long>(1L, 10L),
-				new Tuple2<Long, Long>(1L, 8L),
-				new Tuple2<Long, Long>(1L, 9L),
-				new Tuple2<Long, Long>(1L, 7L));
-		
-		DataSet<TestPojo> input2 = env.fromElements(
-				new TestPojo(0L, 10L, 3L),
-				new TestPojo(0L, 8L, 3L),
-				new TestPojo(0L, 10L, 1L),
-				new TestPojo(0L, 9L, 0L),
-				new TestPojo(0L, 8L, 2L),
-				new TestPojo(0L, 8L, 4L),
-				new TestPojo(1L, 10L, 3L),
-				new TestPojo(1L, 8L, 3L),
-				new TestPojo(1L, 10L, 1L),
-				new TestPojo(1L, 9L, 0L),
-				new TestPojo(1L, 8L, 2L),
-				new TestPojo(1L, 8L, 4L));
-		
-		input1.coGroup(input2)
-		.where(1).equalTo("b")
-		.sortFirstGroup(0, Order.DESCENDING)
-		.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-		
-		.with(new ValidatingCoGroup())
-		.output(new DiscardingOutputFormat<NullValue>());
-		
-		env.execute();
-	}
-	
-	
-	private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
-
-		@Override
-		public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
-			// validate the tuple input, field 1, descending
-			{
-				long lastValue = Long.MAX_VALUE;
-				
-				for (Tuple2<Long, Long> t : first) {
-					long current = t.f1;
-					Assert.assertTrue(current <= lastValue);
-					lastValue = current;
-				}
-			}
-			
-			
-			// validate the pojo input
-			{
-				TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
-				
-				for (TestPojo current : second) {
-					Assert.assertTrue(current.c >= lastValue.c);
-					Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
-					
-					lastValue = current;
-				}
-			}
-			
-		}
-	}
-	
-	public static class TestPojo implements Cloneable {
-		public long a;
-		public long b;
-		public long c;
-		
-		
-		public TestPojo() {}
-		
-		public TestPojo(long a, long b, long c) {
-			this.a = a;
-			this.b = b;
-			this.c = c;
-		}
-	}
-}