You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:18 UTC

[18/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
deleted file mode 100644
index b952bc2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleScalaPrograms;
-
-
-import org.apache.flink.examples.scala.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class WebLogAnalysisITCase extends JavaProgramTestBase {
-
-	private String docsPath;
-	private String ranksPath;
-	private String visitsPath;
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
-		ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
-		visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
-	}
-	@Override
-	protected void testProgram() throws Exception {
-		WebLogAnalysis.main(new String[]{
-				"--documents", docsPath,
-				"--ranks", ranksPath,
-				"--visits", visitsPath,
-				"--output", resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
deleted file mode 100644
index ce55910..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleScalaPrograms;
-
-import org.apache.flink.examples.scala.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class WordCountITCase extends JavaProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	public WordCountITCase(){
-		setParallelism(4);
-		setNumTaskManagers(2);
-		setTaskManagerNumSlots(2);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		WordCount.main(new String[] {
-				"--input", textPath,
-				"--output", resultPath });
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
deleted file mode 100644
index 10e3cb6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.failingPrograms;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(Parameterized.class)
-public class JobSubmissionFailsITCase extends TestLogger {
-	
-	private static final int NUM_SLOTS = 20;
-	
-	private static LocalFlinkMiniCluster cluster;
-	private static JobGraph workingJobGraph;
-
-	@BeforeClass
-	public static void setup() {
-		try {
-			Configuration config = new Configuration();
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
-			
-			cluster = new LocalFlinkMiniCluster(config);
-
-			cluster.start();
-			
-			final JobVertex jobVertex = new JobVertex("Working job vertex.");
-			jobVertex.setInvokableClass(NoOpInvokable.class);
-			workingJobGraph = new JobGraph("Working testing job", jobVertex);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void teardown() {
-		try {
-			cluster.shutdown();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	private boolean detached;
-
-	public JobSubmissionFailsITCase(boolean detached) {
-		this.detached = detached;
-	}
-
-	@Parameterized.Parameters(name = "Detached mode = {0}")
-	public static Collection<Boolean[]> executionModes(){
-		return Arrays.asList(new Boolean[]{false},
-				new Boolean[]{true});
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
-		if (detached) {
-			cluster.submitJobDetached(jobGraph);
-			return null;
-		}
-		else {
-			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
-		}
-	}
-
-	@Test
-	public void testExceptionInInitializeOnMaster() {
-		try {
-			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
-			failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
-
-			try {
-				submitJob(failingJobGraph);
-				fail("Expected JobExecutionException.");
-			}
-			catch (JobExecutionException e) {
-				assertEquals("Test exception.", e.getCause().getMessage());
-			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				fail("Caught wrong exception of type " + t.getClass() + ".");
-			}
-
-			cluster.submitJobAndWait(workingJobGraph, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSubmitEmptyJobGraph() {
-		try {
-			final JobGraph jobGraph = new JobGraph("Testing job");
-	
-			try {
-				submitJob(jobGraph);
-				fail("Expected JobSubmissionException.");
-			}
-			catch (JobSubmissionException e) {
-				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
-			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				fail("Caught wrong exception of type " + t.getClass() + ".");
-			}
-	
-			cluster.submitJobAndWait(workingJobGraph, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSubmitNullJobGraph() {
-		try {
-			try {
-				submitJob(null);
-				fail("Expected JobSubmissionException.");
-			}
-			catch (NullPointerException e) {
-				// yo!
-			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				fail("Caught wrong exception of type " + t.getClass() + ".");
-			}
-
-			cluster.submitJobAndWait(workingJobGraph, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public static class FailingJobVertex extends JobVertex {
-		private static final long serialVersionUID = -6365291240199412135L;
-
-		public FailingJobVertex(final String msg) {
-			super(msg);
-		}
-
-		@Override
-		public void initializeOnMaster(ClassLoader loader) throws Exception {
-			throw new Exception("Test exception.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
deleted file mode 100644
index f6fb0dd..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.failingPrograms;
-
-import java.util.List;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Assert;
-
-/**
- *
- * Tests that both jobs, the failing and the working one, are handled correctly. The first (failing) job must be
- * canceled and the client must report the failure. The second (working) job must finish successfully and compute the
- * correct result.
- *
- */
-public class TaskFailureITCase extends JavaProgramTestBase {
-
-	private static String EXCEPTION_STRING = "This is an expected Test Exception";
-
-	@Override
-	protected void testProgram() throws Exception {
-		//test failing version
-		try {
-			executeTask(new FailingTestMapper(), 1);
-		} catch (RuntimeException e) { //expected for collection execution
-			if (!isCollectionExecution()) {
-				Assert.fail();
-			}
-			// for collection execution, no restarts. So, exception should be appended with 0.
-			Assert.assertEquals(EXCEPTION_STRING + ":0", e.getMessage());
-		} catch (JobExecutionException e) { //expected for cluster execution
-			if (isCollectionExecution()) {
-				Assert.fail();
-			}
-			// for cluster execution, one restart. So, exception should be appended with 1.
-			Assert.assertEquals(EXCEPTION_STRING + ":1", e.getCause().getMessage());
-		}
-		//test correct version
-		executeTask(new TestMapper(), 0);
-	}
-	
-
-	private void executeTask(MapFunction<Long, Long> mapper, int retries) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 0));
-		List<Long> result = env.generateSequence(1, 9)
-				.map(mapper)
-				.collect();
-		MultipleProgramsTestBase.compareResultAsText(result, "1\n2\n3\n4\n5\n6\n7\n8\n9");
-	}
-
-
-	/**
-	 * working map function
-	 */
-	public static class TestMapper implements MapFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long map(Long value) throws Exception {
-			return value;
-		}
-	}
-	
-	/**
-	 * failing map function
-	 */
-	public static class FailingTestMapper extends RichMapFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long map(Long value) throws Exception {
-			throw new RuntimeException(EXCEPTION_STRING + ":" + getRuntimeContext().getAttemptNumber());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
index 468b780..07b4d76 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
@@ -21,12 +21,13 @@ package org.apache.flink.test.hadoop.mapred;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.OperatingSystem;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
@@ -49,10 +50,13 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
 
+/**
+ * Integraiton tests for Hadoop IO formats.
+ */
 @RunWith(Parameterized.class)
 public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 
-	private static int NUM_PROGRAMS = 2;
+	private static final int NUM_PROGRAMS = 2;
 
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String[] resultPath;
@@ -61,9 +65,9 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 	private String sequenceFileInPathNull;
 
 	public HadoopIOFormatsITCase(Configuration config) {
-		super(config);	
+		super(config);
 	}
-	
+
 	@Before
 	public void checkOperatingSystem() {
 		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
@@ -89,25 +93,24 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 		Text value = new Text();
 		SequenceFile.Writer writer = null;
 		try {
-			writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
-			for (int i = 0; i < kvCount; i ++) {
-				if(i == 1) {
+			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
+			for (int i = 0; i < kvCount; i++) {
+				if (i == 1) {
 					// write key = 0 a bit more often.
-					for(int a = 0;a < 15; a++) {
+					for (int a = 0; a < 15; a++) {
 						key.set(i);
-						value.set(i+" - somestring");
+						value.set(i + " - somestring");
 						writer.append(key, value);
 					}
 				}
 				key.set(i);
-				value.set(i+" - somestring");
+				value.set(i + " - somestring");
 				writer.append(key, value);
 			}
 		} finally {
 			IOUtils.closeStream(writer);
 		}
 
-
 		//  ------------------ Long / Text Key Value pair: ------------
 
 		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
@@ -117,8 +120,8 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 		LongWritable value1 = new LongWritable();
 		SequenceFile.Writer writer1 = null;
 		try {
-			writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
-			for (int i = 0; i < kvCount; i ++) {
+			writer1 = SequenceFile.createWriter(fs, conf, path, NullWritable.class, value1.getClass());
+			for (int i = 0; i < kvCount; i++) {
 				value1.set(i);
 				writer1.append(NullWritable.get(), value1);
 			}
@@ -131,32 +134,32 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 	protected void testProgram() throws Exception {
 		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
 	}
-	
+
 	@Override
 	protected void postSubmit() throws Exception {
-		for(int i = 0; i < resultPath.length; i++) {
+		for (int i = 0; i < resultPath.length; i++) {
 			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
 		}
 	}
-	
+
 	@Parameters
 	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
+		for (int i = 1; i <= NUM_PROGRAMS; i++) {
 			Configuration config = new Configuration();
 			config.setInteger("ProgramId", i);
 			tConfigs.add(config);
 		}
-		
+
 		return TestBaseUtils.toParameterList(tConfigs);
 	}
-	
-	public static class HadoopIOFormatPrograms {
-		
-		public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
-			
+
+	private static class HadoopIOFormatPrograms {
+
+		public static String[] runProgram(int progId, String[] resultPath, String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+
 			switch(progId) {
 			case 1: {
 				/**
@@ -184,7 +187,7 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 				});
 				res.writeAsText(resultPath[1]);
 				env.execute();
-				
+
 				// return expected result
 				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
 						"1 - somestring - 1\n" +
@@ -224,8 +227,8 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 			default:
 				throw new IllegalArgumentException("Invalid program id");
 			}
-			
+
 		}
-	
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 7c1b30e..21f6985 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.OperatingSystem;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -37,6 +37,11 @@ import org.apache.hadoop.mapred.TextOutputFormat;
 import org.junit.Assume;
 import org.junit.Before;
 
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
+
+/**
+ * Test WordCount with Hadoop input and output "mapred" (legacy) formats.
+ */
 public class WordCountMapredITCase extends JavaProgramTestBase {
 
 	protected String textPath;
@@ -87,7 +92,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 			}
 		});
 
-
 		DataSet<Tuple2<String, Integer>> counts =
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())
@@ -97,7 +101,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 
 		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
 
-
 			@Override
 			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
 				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index fbc1994..0092fe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.OperatingSystem;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -37,6 +37,11 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.junit.Assume;
 import org.junit.Before;
 
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
+
+/**
+ * Test WordCount with Hadoop input and output "mapreduce" (modern) formats.
+ */
 public class WordCountMapreduceITCase extends JavaProgramTestBase {
 
 	protected String textPath;
@@ -86,7 +91,6 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
 			}
 		});
 
-
 		DataSet<Tuple2<String, Integer>> counts =
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())
@@ -96,7 +100,6 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
 
 		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
 
-
 			@Override
 			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
 				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
index e50dfd3..82f699d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.io;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple8;
@@ -32,6 +30,9 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -42,6 +43,9 @@ import java.io.File;
 import java.util.List;
 import java.util.Locale;
 
+/**
+ * Tests for {@link ExecutionEnvironment#readCsvFile}.
+ */
 @RunWith(Parameterized.class)
 public class CsvReaderITCase extends MultipleProgramsTestBase {
 	private String expected;
@@ -122,6 +126,9 @@ public class CsvReaderITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expected);
 	}
 
+	/**
+	 * POJO.
+	 */
 	public static class POJOItem {
 		public String f1;
 		private int f2;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
index cfdc31a..90611eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
@@ -38,7 +38,7 @@ public class InputOutputITCase extends JavaProgramTestBase {
 		env.createInput(new TestNonRichInputFormat()).output(output);
 		try {
 			env.execute();
-		} catch(Exception e){
+		} catch (Exception e){
 			// we didn't break anything by making everything rich.
 			e.printStackTrace();
 			fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
index 98f19ff..4f25bad 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
@@ -79,9 +79,9 @@ public class RichInputOutputITCase extends JavaProgramTestBase {
 
 		@Override
 		public void open(FileInputSplit split) throws IOException{
-			try{
+			try {
 				getRuntimeContext().addAccumulator("DATA_SOURCE_ACCUMULATOR", counter);
-			} catch(UnsupportedOperationException e){
+			} catch (UnsupportedOperationException e){
 				// the accumulator is already added
 			}
 			super.open(split);
@@ -104,9 +104,9 @@ public class RichInputOutputITCase extends JavaProgramTestBase {
 
 		@Override
 		public void open(int a, int b){
-			try{
+			try {
 				getRuntimeContext().addAccumulator("DATA_SINK_ACCUMULATOR", counter);
-			} catch(UnsupportedOperationException e){
+			} catch (UnsupportedOperationException e){
 				// the accumulator is already added
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index d55a63f..66adbde 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -18,50 +18,49 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-
+import java.util.ArrayList;
+import java.util.List;
 
+/**
+ * Integration test for a bulk iteration with an all reduce.
+ */
 @SuppressWarnings("serial")
 public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
-		
+
 		IterativeDataSet<Integer> iteration = data.iterate(10);
-		
+
 		DataSet<Integer> result = data.reduceGroup(new PickOneAllReduce()).withBroadcastSet(iteration, "bc");
-		
+
 		final List<Integer> resultList = new ArrayList<Integer>();
 		iteration.closeWith(result).output(new LocalCollectionOutputFormat<Integer>(resultList));
-		
+
 		env.execute();
-		
+
 		Assert.assertEquals(8, resultList.get(0).intValue());
 	}
 
-	
-	public static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
-		
+	private static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
+
 		private Integer bcValue;
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			List<Integer> bc = getRuntimeContext().getBroadcastVariable("bc");
@@ -74,8 +73,8 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
 				return;
 			}
 			final int x = bcValue;
-			
-			for (Integer y : records) { 
+
+			for (Integer y : records) {
 				if (y > x) {
 					out.collect(y);
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 2a0b004..2b53249 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.BufferedReader;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,22 +34,26 @@ import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
+import java.io.BufferedReader;
+import java.util.Iterator;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with a cogroup.
+ */
 public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
-	
+
 	private static final long SEED = 0xBADC0FFEEBEEFL;
-	
+
 	private static final int NUM_VERTICES = 1000;
-	
+
 	private static final int NUM_EDGES = 10000;
 
-	
 	private static final int MAX_ITERATIONS = 100;
 
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
-	
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
@@ -66,11 +67,11 @@ public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
 			ConnectedComponentsData.checkOddEvenResult(reader);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  The test program
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 
@@ -111,7 +112,7 @@ public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
 
 	@ForwardedFieldsFirst("f1->f1")
 	@ForwardedFieldsSecond("f0->f0")
-	public static final class MinIdAndUpdate implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	private static final class MinIdAndUpdate implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 7f5d194..a25cf24 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -18,56 +18,58 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.ConnectedComponents.DuplicateValue;
+import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.examples.java.graph.ConnectedComponents.DuplicateValue;
-import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with a cogroup.
+ */
 @SuppressWarnings("serial")
 public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase {
-	
+
 	private static final long SEED = 0xBADC0FFEEBEEFL;
-	
+
 	private static final int NUM_VERTICES = 1000;
-	
+
 	private static final int NUM_EDGES = 10000;
 
-	
 	@Override
 	protected void testProgram() throws Exception {
-			
+
 		// set up execution environment
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// read vertex and edge data
 		DataSet<Long> vertices = env.fromElements(ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES).split("\n"))
 				.map(new VertexParser());
-		
+
 		DataSet<Tuple2<Long, Long>> edges = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"))
 				.flatMap(new EdgeParser());
-		
+
 		// assign the initial components (equal to the vertex id)
 		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
-				
+
 		// open a delta iteration
 		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
 				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
-		
+
 		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
 		DataSet<Tuple2<Long, Long>> changes = iteration
 				.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
@@ -76,35 +78,34 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 
 		// close the delta iteration (delta and new workset are identical)
 		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-		
-		
+
 		// emit result
-		List<Tuple2<Long,Long>> resutTuples = new ArrayList<Tuple2<Long,Long>>();
-		result.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(resutTuples));
-		
+		List<Tuple2<Long, Long>> resutTuples = new ArrayList<>();
+		result.output(new LocalCollectionOutputFormat<>(resutTuples));
+
 		env.execute();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  The test program
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class VertexParser extends RichMapFunction<String, Long> {
+
+	private static final class VertexParser extends RichMapFunction<String, Long> {
 
 		@Override
 		public Long map(String value) throws Exception {
 			return Long.parseLong(value);
 		}
 	}
-	
-	public static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> {
+
+	private static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> {
 
 		@Override
 		public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception {
 			String[] parts = value.split(" ");
 			long v1 = Long.parseLong(parts[0]);
 			long v2 = Long.parseLong(parts[1]);
-			
+
 			out.collect(new Tuple2<Long, Long>(v1, v2));
 			out.collect(new Tuple2<Long, Long>(v2, v1));
 		}
@@ -112,17 +113,17 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 
 	@ForwardedFieldsFirst("0")
 	@ForwardedFieldsSecond("0")
-	public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-		
+	private static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
 		@Override
 		public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
 			Iterator<Tuple2<Long, Long>> iterator = current.iterator();
 			if (!iterator.hasNext()) {
 				throw new RuntimeException("Error: Id not encountered before.");
 			}
-			
+
 			Tuple2<Long, Long> old = iterator.next();
-			
+
 			long minimumComponentID = Long.MAX_VALUE;
 
 			for (Tuple2<Long, Long> candidate : candidates) {
@@ -131,7 +132,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 					minimumComponentID = candidateComponentID;
 				}
 			}
-			
+
 			if (minimumComponentID < old.f1) {
 				old.f1 = minimumComponentID;
 				out.collect(old);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
index 61e08d4..ee5597b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
-import java.io.BufferedReader;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,16 +29,19 @@ import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.io.BufferedReader;
 
+/**
+ * Delta iteration test implementing the connected components algorithm with a join.
+ */
 public class ConnectedComponentsITCase extends JavaProgramTestBase {
-	
+
 	private static final long SEED = 0xBADC0FFEEBEEFL;
-	
+
 	private static final int NUM_VERTICES = 1000;
-	
+
 	private static final int NUM_EDGES = 10000;
 
-	
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
@@ -93,6 +93,11 @@ public class ConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 
+	/**
+	 * Duplicate the vertex ID into both fields of a {@link Tuple2}.
+	 *
+	 * @param <T> key type
+	 */
 	public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>, Tuple2<T, T>> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index bc4885f..e425f29 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
-import java.io.BufferedReader;
-import java.util.Collection;
-
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -35,25 +31,31 @@ import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import java.io.BufferedReader;
+import java.util.Collection;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with a
+ * cogroup and join on the solution set.
+ */
 @RunWith(Parameterized.class)
 public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTestBase {
-	
+
 	private static final long SEED = 0xBADC0FFEEBEEFL;
-	
+
 	private static final int NUM_VERTICES = 1000;
-	
+
 	private static final int NUM_EDGES = 10000;
 
-	
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
-	
-	
+
 	public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
 		super(config);
 	}
@@ -92,8 +94,8 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 				.join(iteration.getSolutionSet()).where(0).equalTo(0)
 				.with(new UpdateComponentIdMatchNonPreserving());
 
-		DataSet<Tuple2<Long,Long>> delta;
-		if(extraMapper) {
+		DataSet<Tuple2<Long, Long>> delta;
+		if (extraMapper) {
 			delta = changes.map(
 					// ID Mapper
 					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@@ -127,14 +129,14 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
 		config1.setBoolean("ExtraMapper", false);
-		
+
 		Configuration config2 = new Configuration();
 		config2.setBoolean("ExtraMapper", true);
-		
+
 		return toParameterList(config1, config2);
 	}
 
-	public static final class UpdateComponentIdMatchNonPreserving
+	private static final class UpdateComponentIdMatchNonPreserving
 			implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		private static final long serialVersionUID = 1L;
 
@@ -144,7 +146,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 				Tuple2<Long, Long> current,
 				Collector<Tuple2<Long, Long>> out) throws Exception {
 
-			if(candidate.f1 < current.f1) {
+			if (candidate.f1 < current.f1) {
 				out.collect(candidate);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
index a8a28f1..ce88ab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.BufferedReader;
-
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -33,22 +30,24 @@ import org.apache.flink.examples.java.graph.ConnectedComponents.UndirectEdge;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.io.BufferedReader;
+
+/**
+ * Delta iteration test implementing the connected components algorithm with an object map.
+ */
 @SuppressWarnings("serial")
 public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase {
-	
+
 	private static final long SEED = 0xBADC0FFEEBEEFL;
-	
+
 	private static final int NUM_VERTICES = 1000;
-	
+
 	private static final int NUM_EDGES = 10000;
 
-	
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
 
-	
-	
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
@@ -63,26 +62,25 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase
 		}
 	}
 
-	
 	@Override
 	protected void testProgram() throws Exception {
 		// set up execution environment
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
+
 		// read vertex and edge data
 		DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
-		
+
 		DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
 												.flatMap(new UndirectEdge());
-				
+
 		// assign the initial components (equal to the vertex id)
 		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
-						
+
 		// open a delta iteration
 		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
 				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
 		iteration.setSolutionSetUnManaged(true);
-				
+
 		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
 		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
 				.groupBy(0).aggregate(Aggregations.MIN, 1)
@@ -91,11 +89,11 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase
 
 		// close the delta iteration (delta and new workset are identical)
 		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-				
+
 		result.writeAsCsv(resultPath, "\n", " ");
-		
+
 		// execute program
 		env.execute("Connected Components Example");
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index c2dd434..6f35917 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
-import java.io.BufferedReader;
-
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -34,18 +31,19 @@ import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
+import java.io.BufferedReader;
+
 /**
  * Tests a bug that prevented that the solution set can be on both sides of the match/cogroup function.
  */
 public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTestBase {
-	
+
 	private static final long SEED = 0xBADC0FFEEBEEFL;
-	
+
 	private static final int NUM_VERTICES = 1000;
-	
+
 	private static final int NUM_EDGES = 10000;
 
-	
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
@@ -56,7 +54,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
 		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
 		resultPath = getTempFilePath("results");
 	}
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		// set up execution environment
@@ -92,7 +90,6 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
 		env.execute("Connected Components Example");
 	}
 
-
 	@Override
 	protected void postSubmit() throws Exception {
 		for (BufferedReader reader : getResultReader(resultPath)) {
@@ -105,7 +102,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
 	// --------------------------------------------------------------------------------------------
 
 	@FunctionAnnotation.ForwardedFieldsSecond("*")
-	public static final class UpdateComponentIdMatchMirrored
+	private static final class UpdateComponentIdMatchMirrored
 			implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		private static final long serialVersionUID = 1L;
 
@@ -115,7 +112,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTe
 				Tuple2<Long, Long> candidate,
 				Collector<Tuple2<Long, Long>> out) throws Exception {
 
-			if(candidate.f1 < current.f1) {
+			if (candidate.f1 < current.f1) {
 				out.collect(candidate);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
index 53496e2..8f8f28b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
@@ -46,14 +46,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
+/**
+ * Implementation of PageRank accounting for "sink" vertices with 0 out-degree.
+ */
 @RunWith(Parameterized.class)
 @SuppressWarnings({"serial", "unchecked"})
 public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 
 	private static final String AGGREGATOR_NAME = "pagerank.aggregator";
-	
-	
+
 	public DanglingPageRankITCase(TestExecutionMode mode) {
 		super(mode);
 	}
@@ -61,9 +62,9 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testDanglingPageRank() {
 		try {
-			final int NUM_ITERATIONS = 25;
+			final int numIterations = 25;
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Tuple2<Long, Boolean>> vertices = env.fromElements(
 					new Tuple2<>(1L, false),
 					new Tuple2<>(2L, false),
@@ -78,8 +79,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 					new PageWithLinks(4L, new long[] { 3, 2 }),
 					new PageWithLinks(1L, new long[] { 4, 2, 3 })
 			);
-			
-			
+
 			final long numVertices = vertices.count();
 			final long numDanglingVertices = vertices
 					.filter(
@@ -90,32 +90,31 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 								}
 							})
 					.count();
-			
-			
+
 			DataSet<PageWithRankAndDangling> verticesWithInitialRank = vertices
 					.map(new MapFunction<Tuple2<Long, Boolean>, PageWithRankAndDangling>() {
-						
+
 						@Override
 						public PageWithRankAndDangling map(Tuple2<Long, Boolean> value) {
 							return new PageWithRankAndDangling(value.f0, 1.0 / numVertices, value.f1);
 						}
 					});
-			
-			IterativeDataSet<PageWithRankAndDangling> iteration = verticesWithInitialRank.iterate(NUM_ITERATIONS);
+
+			IterativeDataSet<PageWithRankAndDangling> iteration = verticesWithInitialRank.iterate(numIterations);
 
 			iteration.getAggregators().registerAggregationConvergenceCriterion(
 					AGGREGATOR_NAME,
 					new PageRankStatsAggregator(),
 					new DiffL1NormConvergenceCriterion());
-			
+
 			DataSet<PageWithRank> partialRanks = iteration.join(edges).where("pageId").equalTo("pageId").with(
 					new FlatJoinFunction<PageWithRankAndDangling, PageWithLinks, PageWithRank>() {
-						
+
 						@Override
 						public void join(PageWithRankAndDangling page,
 											PageWithLinks links,
 											Collector<PageWithRank> out)  {
-							
+
 							double rankToDistribute = page.rank / (double) links.targets.length;
 							PageWithRank output = new PageWithRank(0L, rankToDistribute);
 
@@ -126,8 +125,8 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 						}
 					}
 			);
-			
-			DataSet<PageWithRankAndDangling> newRanks = 
+
+			DataSet<PageWithRankAndDangling> newRanks =
 				iteration.coGroup(partialRanks).where("pageId").equalTo("pageId").with(
 					new RichCoGroupFunction<PageWithRankAndDangling, PageWithRank, PageWithRankAndDangling>() {
 
@@ -136,15 +135,15 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 						private final double randomJump = (1.0 - BETA) / numVertices;
 						private PageRankStatsAggregator aggregator;
 						private double danglingRankFactor;
-						
+
 						@Override
 						public void open(Configuration parameters) throws Exception {
 							int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-							
+
 							aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
 
 							if (currentIteration == 1) {
-								danglingRankFactor = BETA * (double) numDanglingVertices / 
+								danglingRankFactor = BETA * (double) numDanglingVertices /
 										((double) numVertices * (double) numVertices);
 							} else {
 								PageRankStats previousAggregate = getIterationRuntimeContext()
@@ -152,12 +151,12 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 								danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
 							}
 						}
-						
+
 						@Override
 						public void coGroup(Iterable<PageWithRankAndDangling> currentPages,
 											Iterable<PageWithRank> partialRanks,
 											Collector<PageWithRankAndDangling> out) {
-							
+
 							// compute the next rank
 							long edges = 0;
 							double summedRank = 0;
@@ -166,7 +165,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 								edges++;
 							}
 							double rank = BETA * summedRank + randomJump + danglingRankFactor;
-							
+
 							// current rank, for stats and convergence
 							PageWithRankAndDangling currentPage = currentPages.iterator().next();
 							double currentRank = currentPage.rank;
@@ -182,16 +181,16 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 							out.collect(currentPage);
 						}
 					});
-			
+
 			List<PageWithRankAndDangling> result = iteration.closeWith(newRanks).collect();
-			
+
 			double totalRank = 0.0;
 			for (PageWithRankAndDangling r : result) {
 				totalRank += r.rank;
 				assertTrue(r.pageId >= 1 && r.pageId <= 5);
 				assertTrue(r.pageId != 3 || r.dangling);
 			}
-			
+
 			assertEquals(1.0, totalRank, 0.001);
 		}
 		catch (Exception e) {
@@ -203,25 +202,31 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 	// ------------------------------------------------------------------------
 	//  custom types
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * POJO for page ID and rank value.
+	 */
 	public static class PageWithRank {
-		
+
 		public long pageId;
 		public double rank;
 
 		public PageWithRank() {}
-		
+
 		public PageWithRank(long pageId, double rank) {
 			this.pageId = pageId;
 			this.rank = rank;
 		}
 	}
 
+	/**
+	 * POJO for page ID, rank value, and whether a "dangling" vertex with 0 out-degree.
+	 */
 	public static class PageWithRankAndDangling {
 
 		public long pageId;
 		public double rank;
-		public boolean dangling; 
+		public boolean dangling;
 
 		public PageWithRankAndDangling() {}
 
@@ -241,6 +246,9 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 		}
 	}
 
+	/**
+	 * POJO for page ID and list of target IDs.
+	 */
 	public static class PageWithLinks {
 
 		public long pageId;
@@ -253,11 +261,14 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 			this.targets = targets;
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  statistics
 	// ------------------------------------------------------------------------
 
+	/**
+	 * PageRank statistics.
+	 */
 	public static class PageRankStats implements Value {
 
 		private double diff;
@@ -272,7 +283,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 		public PageRankStats(
 					double diff, double rank, double danglingRank,
 					long numDanglingVertices, long numVertices, long edges) {
-			
+
 			this.diff = diff;
 			this.rank = rank;
 			this.danglingRank = danglingRank;
@@ -332,8 +343,8 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 					"]";
 		}
 	}
-	
-	public static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
+
+	private static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
 
 		private double diff;
 		private double rank;
@@ -348,7 +359,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 		}
 
 		public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
-							  long verticesDelta, long edgesDelta) {
+				long verticesDelta, long edgesDelta) {
 			diff += diffDelta;
 			rank += rankDelta;
 			danglingRank += danglingRankDelta;
@@ -378,7 +389,7 @@ public class DanglingPageRankITCase extends MultipleProgramsTestBase {
 		}
 	}
 
-	public static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
+	private static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
 
 		private static final double EPSILON = 0.00005;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
index caa9d37..84feacf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.test.iterative;
 
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -32,9 +27,18 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test delta iterations that do not join with the solution set.
+ */
 @SuppressWarnings("serial")
 public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTestBase {
-	private final List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long,Long>>();
+	private final List<Tuple2<Long, Long>> result = new ArrayList<>();
 
 	@Override
 	protected void testProgram() throws Exception {
@@ -47,7 +51,7 @@ public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTe
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 5, 1);
 
 			iteration.closeWith(iteration.getWorkset(), iteration.getWorkset().map(new TestMapper()))
-					.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(result));
+					.output(new LocalCollectionOutputFormat<Tuple2<Long, Long>>(result));
 
 			env.execute();
 		}
@@ -75,11 +79,11 @@ public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTe
 			return new Tuple2<T, T>(value, value);
 		}
 	}
-	
+
 	private static final class TestMapper extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		@Override
 		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
-			return new Tuple2<Long, Long>(value.f0+10, value.f1+10);
+			return new Tuple2<>(value.f0 + 10, value.f1 + 10);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index 0635fe5..74e3da2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -18,30 +18,28 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
- * 
  * Iterative Connected Components test case which recomputes only the elements
  * of the solution set whose at least one dependency (in-neighbor) has changed since the last iteration.
  * Requires two joins with the solution set.
- *
  */
 @SuppressWarnings("serial")
 public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
-	
+
 	private static final int MAX_ITERATIONS = 20;
 	private static final int parallelism = 1;
 
@@ -53,46 +51,45 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	public DependencyConnectedComponentsITCase(){
 		setTaskManagerNumSlots(parallelism);
 	}
-	
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesInput.clear();
 		edgesInput.clear();
 
 		// vertices input
-		verticesInput.add(new Tuple2<Long, Long>(1l,1l));
-		verticesInput.add(new Tuple2<Long, Long>(2l,2l));
-		verticesInput.add(new Tuple2<Long, Long>(3l,3l));
-		verticesInput.add(new Tuple2<Long, Long>(4l,4l));
-		verticesInput.add(new Tuple2<Long, Long>(5l,5l));
-		verticesInput.add(new Tuple2<Long, Long>(6l,6l));
-		verticesInput.add(new Tuple2<Long, Long>(7l,7l));
-		verticesInput.add(new Tuple2<Long, Long>(8l,8l));
-		verticesInput.add(new Tuple2<Long, Long>(9l,9l));
-		
+		verticesInput.add(new Tuple2<>(1L, 1L));
+		verticesInput.add(new Tuple2<>(2L, 2L));
+		verticesInput.add(new Tuple2<>(3L, 3L));
+		verticesInput.add(new Tuple2<>(4L, 4L));
+		verticesInput.add(new Tuple2<>(5L, 5L));
+		verticesInput.add(new Tuple2<>(6L, 6L));
+		verticesInput.add(new Tuple2<>(7L, 7L));
+		verticesInput.add(new Tuple2<>(8L, 8L));
+		verticesInput.add(new Tuple2<>(9L, 9L));
+
 		// vertices input
-		edgesInput.add(new Tuple2<Long, Long>(1l,2l));
-		edgesInput.add(new Tuple2<Long, Long>(1l,3l));
-		edgesInput.add(new Tuple2<Long, Long>(2l,3l));
-		edgesInput.add(new Tuple2<Long, Long>(2l,4l));
-		edgesInput.add(new Tuple2<Long, Long>(2l,1l));
-		edgesInput.add(new Tuple2<Long, Long>(3l,1l));
-		edgesInput.add(new Tuple2<Long, Long>(3l,2l));
-		edgesInput.add(new Tuple2<Long, Long>(4l,2l));
-		edgesInput.add(new Tuple2<Long, Long>(4l,6l));
-		edgesInput.add(new Tuple2<Long, Long>(5l,6l));
-		edgesInput.add(new Tuple2<Long, Long>(6l,4l));
-		edgesInput.add(new Tuple2<Long, Long>(6l,5l));
-		edgesInput.add(new Tuple2<Long, Long>(7l,8l));
-		edgesInput.add(new Tuple2<Long, Long>(7l,9l));
-		edgesInput.add(new Tuple2<Long, Long>(8l,7l));
-		edgesInput.add(new Tuple2<Long, Long>(8l,9l));
-		edgesInput.add(new Tuple2<Long, Long>(9l,7l));
-		edgesInput.add(new Tuple2<Long, Long>(9l,8l));
-		
+		edgesInput.add(new Tuple2<>(1L, 2L));
+		edgesInput.add(new Tuple2<>(1L, 3L));
+		edgesInput.add(new Tuple2<>(2L, 3L));
+		edgesInput.add(new Tuple2<>(2L, 4L));
+		edgesInput.add(new Tuple2<>(2L, 1L));
+		edgesInput.add(new Tuple2<>(3L, 1L));
+		edgesInput.add(new Tuple2<>(3L, 2L));
+		edgesInput.add(new Tuple2<>(4L, 2L));
+		edgesInput.add(new Tuple2<>(4L, 6L));
+		edgesInput.add(new Tuple2<>(5L, 6L));
+		edgesInput.add(new Tuple2<>(6L, 4L));
+		edgesInput.add(new Tuple2<>(6L, 5L));
+		edgesInput.add(new Tuple2<>(7L, 8L));
+		edgesInput.add(new Tuple2<>(7L, 9L));
+		edgesInput.add(new Tuple2<>(8L, 7L));
+		edgesInput.add(new Tuple2<>(8L, 9L));
+		edgesInput.add(new Tuple2<>(9L, 7L));
+		edgesInput.add(new Tuple2<>(9L, 8L));
+
 		resultPath = getTempDirPath("result");
-		
+
 		expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,1)\n" + "(4,1)\n" +
 						"(5,1)\n" + "(6,1)\n" + "(7,7)\n" + "(8,7)\n" + "(9,7)\n";
 	}
@@ -101,63 +98,67 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	protected void testProgram() throws Exception {
 		DependencyConnectedComponentsProgram.runProgram(resultPath);
 	}
-	
+
 	@Override
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
 
-	
 	private static class DependencyConnectedComponentsProgram {
-		
+
 		public static String runProgram(String resultPath) throws Exception {
-			
+
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(parallelism);
-			
+
 			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
 			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
 			int keyPosition = 0;
-			
+
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
 					initialSolutionSet.iterateDelta(initialSolutionSet, MAX_ITERATIONS, keyPosition);
-			
+
 			DataSet<Long> candidates = iteration.getWorkset().join(edges).where(0).equalTo(0)
 					.with(new FindCandidatesJoin())
-					.groupBy(new KeySelector<Long, Long>() { 
-                        public Long getKey(Long id) { return id; } 
-                      }).reduceGroup(new RemoveDuplicatesReduce());
-			
-			DataSet<Tuple2<Long, Long>> candidatesDependencies = 
+					.groupBy(new KeySelector<Long, Long>() {
+						public Long getKey(Long id) {
+							return id;
+						}
+					}).reduceGroup(new RemoveDuplicatesReduce());
+
+			DataSet<Tuple2<Long, Long>> candidatesDependencies =
 					candidates.join(edges)
-					.where(new KeySelector<Long, Long>() { 
-                        public Long getKey(Long id) { return id; } 
-                      }).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() { 
-                        public Long getKey(Tuple2<Long, Long> vertexWithId) 
-                        { return vertexWithId.f1; } 
-                      }).with(new FindCandidatesDependenciesJoin());
-			
-			DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
+					.where(new KeySelector<Long, Long>() {
+						public Long getKey(Long id) {
+							return id;
+						}
+					}).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
+						public Long getKey(Tuple2<Long, Long> vertexWithId) {
+							return vertexWithId.f1;
+						}
+					}).with(new FindCandidatesDependenciesJoin());
+
+			DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
 					candidatesDependencies.join(iteration.getSolutionSet()).where(0).equalTo(0)
 					.with(new NeighborWithComponentIDJoin())
 					.groupBy(0).reduceGroup(new MinimumReduce());
-			
-			DataSet<Tuple2<Long, Long>> updatedComponentId = 
+
+			DataSet<Tuple2<Long, Long>> updatedComponentId =
 					verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
 					.flatMap(new MinimumIdFilter());
-			
+
 			iteration.closeWith(updatedComponentId, updatedComponentId).writeAsText(resultPath);
-			
+
 			env.execute();
-			
+
 			return resultPath;
 		}
 	}
-	
-	public static final class FindCandidatesJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
-		
+
+	private static final class FindCandidatesJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
+
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Long join(Tuple2<Long, Long> vertexWithCompId,
 				Tuple2<Long, Long> edge) throws Exception {
@@ -165,9 +166,9 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 			return edge.f1;
 		}
 	}
-	
-	public static final class RemoveDuplicatesReduce extends RichGroupReduceFunction<Long, Long> {
-		
+
+	private static final class RemoveDuplicatesReduce extends RichGroupReduceFunction<Long, Long> {
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -175,35 +176,35 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 				out.collect(values.iterator().next());
 		}
 	}
-	
-	public static final class FindCandidatesDependenciesJoin extends RichJoinFunction<Long, Tuple2<Long, Long>,Tuple2<Long, Long>> {
-	
+
+	private static final class FindCandidatesDependenciesJoin extends RichJoinFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Tuple2<Long, Long> join(Long candidateId, Tuple2<Long, Long> edge) throws Exception {
 			return edge;
 		}
 	}
-	
-	public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-	
+
+	private static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Tuple2<Long, Long> join(Tuple2<Long, Long> edge,
 				Tuple2<Long, Long> vertexWithCompId) throws Exception {
-			
+
 			vertexWithCompId.setField(edge.f1, 0);
 			return vertexWithCompId;
 		}
 	}
-	
-	public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-		
+
+	private static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
 		private static final long serialVersionUID = 1L;
 		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
-		
+
 		@Override
 		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
 			Long vertexId = 0L;
@@ -223,16 +224,15 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 
-	public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-	
+	private static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+
 		private static final long serialVersionUID = 1L;
-	
+
 		@Override
 		public void flatMap(
 				Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId,
-				Collector<Tuple2<Long, Long>> out)
-		{
-			if ( vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1 ) {
+				Collector<Tuple2<Long, Long>> out) {
+			if (vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1) {
 				out.collect(vertexWithNewAndOldId.f0);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
index 3f02064..f2548a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.iterative;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,31 +26,37 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test empty (identity) delta iteration.
+ */
 @SuppressWarnings("serial")
 public class EmptyWorksetIterationITCase extends JavaProgramTestBase {
-	
+
 	private List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long, Long>>();
-	
+
 	@Override
 	protected void testProgram() throws Exception {
-		
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20).map(new Dupl());
-				
+
 		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 20, 0);
 		iter.closeWith(iter.getWorkset(), iter.getWorkset())
 			.output(new LocalCollectionOutputFormat<Tuple2<Long, Long>>(result));
-		
+
 		env.execute();
 	}
 
-	public static final class Dupl implements MapFunction<Long, Tuple2<Long, Long>> {
+	private static final class Dupl implements MapFunction<Long, Tuple2<Long, Long>> {
 
 		@Override
 		public Tuple2<Long, Long> map(Long value) {
 			return new Tuple2<Long, Long>(value, value);
 		}
-		
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
index c422aa4..9e9b5ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
@@ -18,35 +18,38 @@
 
 package org.apache.flink.test.iterative;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test empty (identity) bulk iteration.
+ */
 public class IdentityIterationITCase extends JavaProgramTestBase {
 
 	private List<Long> result = new ArrayList<Long>();
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		IterativeDataSet<Long> iteration = env.generateSequence(1, 10).iterate(100);
 		iteration.closeWith(iteration)
 			.output(new LocalCollectionOutputFormat<Long>(result));
-		
+
 		env.execute();
 	}
-	
+
 	@Override
 	protected void postSubmit()  {
 		assertEquals(10, result.size());
-		
+
 		long sum = 0;
 		for (Long l : result) {
 			sum += l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
index 48788a5..458e453 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
@@ -26,16 +26,20 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * Test where the test data is constructed such that the merge join zig zag
+ * has an early out, leaving elements on the dynamic path input unconsumed.
+ */
 @SuppressWarnings("serial")
 public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// the test data is constructed such that the merge join zig zag
 		// has an early out, leaving elements on the dynamic path input unconsumed
-		
+
 		DataSet<Path> edges = env.fromElements(
 				new Path(1, 2),
 				new Path(1, 4),
@@ -46,24 +50,24 @@ public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgram
 				new Path(3, 14),
 				new Path(3, 16),
 				new Path(1, 18),
-				new Path(1, 20) );
-		
+				new Path(1, 20));
+
 		IterativeDataSet<Path> currentPaths = edges.iterate(10);
-		
+
 		DataSet<Path> newPaths = currentPaths
 				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
 					.with(new PathConnector())
 				.union(currentPaths).distinct("from", "to");
-		
+
 		DataSet<Path> result = currentPaths.closeWith(newPaths);
-		
+
 		result.output(new DiscardingOutputFormat<Path>());
-		
+
 		env.execute();
 	}
-	
+
 	private static class PathConnector implements JoinFunction<Path, Path, Path> {
-		
+
 		@Override
 		public Path join(Path path, Path edge)  {
 			return new Path(path.from, edge.to);
@@ -71,19 +75,22 @@ public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgram
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Simple POJO.
+	 */
 	public static class Path {
-		
+
 		public long from;
 		public long to;
-		
+
 		public Path() {}
-		
+
 		public Path(long from, long to) {
 			this.from = from;
 			this.to = to;
 		}
-		
+
 		@Override
 		public String toString() {
 			return "(" + from + "," + to + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
index b42e86b..f7cd111 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
@@ -26,16 +26,20 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * Test where the test data is constructed such that the merge join zig zag
+ * has an early out, leaving elements on the static path input unconsumed.
+ */
 @SuppressWarnings("serial")
 public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-	
+
 		// the test data is constructed such that the merge join zig zag
 		// has an early out, leaving elements on the static path input unconsumed
-		
+
 		DataSet<Path> edges = env.fromElements(
 				new Path(2, 1),
 				new Path(4, 1),
@@ -46,24 +50,24 @@ public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramT
 				new Path(14, 3),
 				new Path(16, 3),
 				new Path(18, 1),
-				new Path(20, 1) );
-		
+				new Path(20, 1));
+
 		IterativeDataSet<Path> currentPaths = edges.iterate(10);
-		
+
 		DataSet<Path> newPaths = currentPaths
 				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
 					.with(new PathConnector())
 				.union(currentPaths).distinct("from", "to");
-		
+
 		DataSet<Path> result = currentPaths.closeWith(newPaths);
-		
+
 		result.output(new DiscardingOutputFormat<Path>());
-		
+
 		env.execute();
 	}
-	
+
 	private static class PathConnector implements JoinFunction<Path, Path, Path> {
-		
+
 		@Override
 		public Path join(Path path, Path edge)  {
 			return new Path(path.from, edge.to);
@@ -71,19 +75,22 @@ public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramT
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Simple POJO.
+	 */
 	public static class Path {
-		
+
 		public long from;
 		public long to;
-		
+
 		public Path() {}
-		
+
 		public Path(long from, long to) {
 			this.from = from;
 			this.to = to;
 		}
-		
+
 		@Override
 		public String toString() {
 			return "(" + from + "," + to + ")";