You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 16:23:01 UTC

[1/4] flink git commit: [FLINK-7185] Activate checkstyle flink-java/io

Repository: flink
Updated Branches:
  refs/heads/master d578810b4 -> 3c42557f3


http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index d047aa6..8939c5a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -16,11 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -30,6 +33,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -49,13 +53,16 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link CsvInputFormat}.
+ */
 public class CsvInputFormatTest {
-	
+
 	private static final Path PATH = new Path("an/ignored/file/");
-	
+
 	//Static variables for testing the removal of \r\n to \n
 	private static final String FIRST_PART = "That is the first part";
-	
+
 	private static final String SECOND_PART = "That is the second part";
 
 	@Test
@@ -70,7 +77,7 @@ public class CsvInputFormatTest {
 
 	private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception {
 		final String fileContent =
-			"this is|1|2.0|\n"+
+			"this is|1|2.0|\n" +
 			"a test|3|4.0|\n" +
 			"#next|5|6.0|\n" +
 			"asdadas|5|30.0|\n";
@@ -173,9 +180,9 @@ public class CsvInputFormatTest {
 
 	private void ignoreInvalidLines(int bufferSize) {
 		try {
-			final String fileContent =  "#description of the data\n" + 
-										"header1|header2|header3|\n"+
-										"this is|1|2.0|\n"+
+			final String fileContent =  "#description of the data\n" +
+										"header1|header2|header3|\n" +
+										"this is|1|2.0|\n" +
 										"//a comment\n" +
 										"a test|3|4.0|\n" +
 										"#next|5|6.0|\n" +
@@ -191,8 +198,7 @@ public class CsvInputFormatTest {
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
-			
-			
+
 			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
 			result = format.nextRecord(result);
 			assertNotNull(result);
@@ -224,34 +230,34 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void ignoreSingleCharPrefixComments() {
 		try {
 			final String fileContent = "#description of the data\n" +
-									   "#successive commented line\n" +
-									   "this is|1|2.0|\n" +
-									   "a test|3|4.0|\n" +
-									   "#next|5|6.0|\n";
-			
+				"#successive commented line\n" +
+				"this is|1|2.0|\n" +
+				"a test|3|4.0|\n" +
+				"#next|5|6.0|\n";
+
 			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
 			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setCommentPrefix("#");
-		
+
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
-			
+
 			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("this is", result.f0);
 			assertEquals(Integer.valueOf(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
@@ -266,42 +272,41 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void ignoreMultiCharPrefixComments() {
 		try {
-			
-			
+
 			final String fileContent = "//description of the data\n" +
-									   "//successive commented line\n" +
-									   "this is|1|2.0|\n"+
-									   "a test|3|4.0|\n" +
-									   "//next|5|6.0|\n";
-			
+				"//successive commented line\n" +
+				"this is|1|2.0|\n" +
+				"a test|3|4.0|\n" +
+				"//next|5|6.0|\n";
+
 			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
 			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setCommentPrefix("//");
-		
+
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
-			
+
 			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("this is", result.f0);
 			assertEquals(Integer.valueOf(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
 			assertEquals(Integer.valueOf(3), result.f1);
 			assertEquals(new Double(4.0), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 		}
@@ -323,27 +328,27 @@ public class CsvInputFormatTest {
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
-			
+
 			Tuple3<String, String, String> result = new Tuple3<String, String, String>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("abc", result.f0);
 			assertEquals("def", result.f1);
 			assertEquals("ghijk", result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("abc", result.f0);
 			assertEquals("", result.f1);
 			assertEquals("hhg", result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("", result.f0);
 			assertEquals("", result.f1);
 			assertEquals("", result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -397,7 +402,7 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void readStringFieldsWithTrailingDelimiters() {
 		try {
@@ -406,26 +411,26 @@ public class CsvInputFormatTest {
 
 			final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
 			final CsvInputFormat<Tuple3<String, String, String>> format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
-			
+
 			format.setFieldDelimiter("|-");
 
 			format.configure(new Configuration());
 			format.open(split);
 
 			Tuple3<String, String, String> result = new Tuple3<String, String, String>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("abc", result.f0);
 			assertEquals("def", result.f1);
 			assertEquals("ghijk", result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("abc", result.f0);
 			assertEquals("", result.f1);
 			assertEquals("hhg", result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("", result.f0);
@@ -594,19 +599,19 @@ public class CsvInputFormatTest {
 	public void testDoubleFields() throws IOException {
 		try {
 			final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple5<Double, Double, Double, Double, Double>> typeInfo =
 					TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class);
 			final CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>> format = new TupleCsvInputFormat<Tuple5<Double, Double, Double, Double, Double>>(PATH, typeInfo);
-			
+
 			format.setFieldDelimiter("|");
 
 			format.configure(new Configuration());
 			format.open(split);
-			
+
 			Tuple5<Double, Double, Double, Double, Double> result = new Tuple5<Double, Double, Double, Double, Double>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Double.valueOf(11.1), result.f0);
@@ -614,7 +619,7 @@ public class CsvInputFormatTest {
 			assertEquals(Double.valueOf(33.3), result.f2);
 			assertEquals(Double.valueOf(44.4), result.f3);
 			assertEquals(Double.valueOf(55.5), result.f4);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Double.valueOf(66.6), result.f0);
@@ -622,7 +627,7 @@ public class CsvInputFormatTest {
 			assertEquals(Double.valueOf(88.8), result.f2);
 			assertEquals(Double.valueOf(99.9), result.f3);
 			assertEquals(Double.valueOf(00.0), result.f4);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -631,7 +636,7 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testReadFirstN() throws IOException {
 		try {
@@ -640,24 +645,24 @@ public class CsvInputFormatTest {
 
 			final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
 			final CsvInputFormat<Tuple2<Integer, Integer>> format = new TupleCsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
-			
+
 			format.setFieldDelimiter("|");
 
 			format.configure(new Configuration());
 			format.open(split);
-			
+
 			Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(111), result.f0);
 			assertEquals(Integer.valueOf(222), result.f1);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(666), result.f0);
 			assertEquals(Integer.valueOf(777), result.f1);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -665,38 +670,38 @@ public class CsvInputFormatTest {
 		catch (Exception ex) {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
-		
+
 	}
-	
+
 	@Test
 	public void testReadSparseWithNullFieldsForTypes() throws IOException {
 		try {
 			final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
 					"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
 			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true});
-			
+
 			format.setFieldDelimiter("|x|");
-			
+
 			format.configure(new Configuration());
 			format.open(split);
-			
+
 			Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(111), result.f0);
 			assertEquals(Integer.valueOf(444), result.f1);
 			assertEquals(Integer.valueOf(888), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(000), result.f0);
 			assertEquals(Integer.valueOf(777), result.f1);
 			assertEquals(Integer.valueOf(333), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -705,35 +710,35 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testReadSparseWithPositionSetter() throws IOException {
 		try {
 			final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
 			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new int[]{0, 3, 7});
-			
+
 			format.setFieldDelimiter("|");
-			
+
 			format.configure(new Configuration());
 			format.open(split);
-			
+
 			Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(111), result.f0);
 			assertEquals(Integer.valueOf(444), result.f1);
 			assertEquals(Integer.valueOf(888), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(000), result.f0);
 			assertEquals(Integer.valueOf(777), result.f1);
 			assertEquals(Integer.valueOf(333), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -748,30 +753,30 @@ public class CsvInputFormatTest {
 		try {
 			final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
 					"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
 			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true});
-			
+
 			format.setFieldDelimiter("&&");
-			
+
 			format.configure(new Configuration());
 			format.open(split);
-			
+
 			Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(111), result.f0);
 			assertEquals(Integer.valueOf(444), result.f1);
 			assertEquals(Integer.valueOf(888), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(000), result.f0);
 			assertEquals(Integer.valueOf(777), result.f1);
 			assertEquals(Integer.valueOf(333), result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -784,7 +789,7 @@ public class CsvInputFormatTest {
 	@Test
 	public void testParseStringErrors() throws Exception {
 		StringParser stringParser = new StringParser();
-		stringParser.enableQuotedStringParsing((byte)'"');
+		stringParser.enableQuotedStringParsing((byte) '"');
 
 		Object[][] failures = {
 				{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
@@ -801,7 +806,6 @@ public class CsvInputFormatTest {
 			assertThat(stringParser.getErrorState(), is(failure[1]));
 		}
 
-
 	}
 
 	// Test disabled becase we do not support double-quote escaped quotes right now.
@@ -836,7 +840,7 @@ public class CsvInputFormatTest {
 				new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
 				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
 				new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
-				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00	),
+				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00),
 				new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
 		};
 
@@ -864,37 +868,37 @@ public class CsvInputFormatTest {
 		);
 		wrt.write(content);
 		wrt.close();
-			
+
 		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
 	}
-	
+
 	@Test
 	public void testWindowsLineEndRemoval() {
-		
+
 		//Check typical use case -- linux file is correct and it is set up to linuc(\n)
 		this.testRemovingTrailingCR("\n", "\n");
-		
+
 		//Check typical windows case -- windows file endings and file has windows file endings set up
 		this.testRemovingTrailingCR("\r\n", "\r\n");
-		
+
 		//Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up
 		this.testRemovingTrailingCR("\r\n", "\n");
-		
+
 		//Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n)
 		//Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard.
 	}
-	
+
 	private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) {
-		File tempFile=null;
-		
+		File tempFile = null;
+
 		String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile;
-		
+
 		try {
 			// create input file
 			tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
 			tempFile.deleteOnExit();
 			tempFile.setWritable(true);
-			
+
 			OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
 			wrt.write(fileContent);
 			wrt.close();
@@ -902,28 +906,26 @@ public class CsvInputFormatTest {
 			final TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 			final CsvInputFormat<Tuple1<String>> inputFormat = new TupleCsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()), typeInfo);
 
-			Configuration parameters = new Configuration(); 
+			Configuration parameters = new Configuration();
 			inputFormat.configure(parameters);
-			
+
 			inputFormat.setDelimiter(lineBreakerSetup);
-			
+
 			FileInputSplit[] splits = inputFormat.createInputSplits(1);
-						
+
 			inputFormat.open(splits[0]);
-			
+
 			Tuple1<String> result = inputFormat.nextRecord(new Tuple1<String>());
-			
+
 			assertNotNull("Expecting to not return null", result);
-			
-			
-			
+
 			assertEquals(FIRST_PART, result.f0);
-			
+
 			result = inputFormat.nextRecord(result);
-			
+
 			assertNotNull("Expecting to not return null", result);
 			assertEquals(SECOND_PART, result.f0);
-			
+
 		}
 		catch (Throwable t) {
 			System.err.println("test failed with exception: " + t.getMessage());
@@ -1219,7 +1221,7 @@ public class CsvInputFormatTest {
 		writer.close();
 
 		@SuppressWarnings("unchecked")
-		PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>)TypeExtractor.createTypeInfo(TwitterPOJO.class);
+		PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>) TypeExtractor.createTypeInfo(TwitterPOJO.class);
 		CsvInputFormat<TwitterPOJO> inputFormat = new PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		inputFormat.configure(new Configuration());
@@ -1238,7 +1240,7 @@ public class CsvInputFormatTest {
 
 		TwitterPOJO pojo;
 
-		while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
+		while ((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
 			actual.add(pojo);
 		}
 
@@ -1249,6 +1251,9 @@ public class CsvInputFormatTest {
 	// Custom types for testing
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Sample test pojo.
+	 */
 	public static class PojoItem {
 		public int field1;
 		public String field2;
@@ -1256,6 +1261,9 @@ public class CsvInputFormatTest {
 		public String field4;
 	}
 
+	/**
+	 * Sample test pojo with private fields.
+	 */
 	public static class PrivatePojoItem {
 		private int field1;
 		private String field2;
@@ -1295,6 +1303,9 @@ public class CsvInputFormatTest {
 		}
 	}
 
+	/**
+	 * Sample test pojo.
+	 */
 	public static class POJO {
 		public String table;
 		public String time;
@@ -1319,6 +1330,9 @@ public class CsvInputFormatTest {
 		}
 	}
 
+	/**
+	 * Sample test pojo representing tweets.
+	 */
 	public static class TwitterPOJO extends POJO {
 		public String tweet;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a8ce495..a244306 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -37,13 +37,16 @@ import java.util.List;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link CsvOutputFormat}.
+ */
 public class CsvOutputFormatTest {
 
 	private String path = null;
 
 	@Before
 	public void createFile() throws Exception {
-		path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
+		path = File.createTempFile("csv_output_test_file", ".csv").getAbsolutePath();
 	}
 
 	@Test
@@ -80,7 +83,7 @@ public class CsvOutputFormatTest {
 			} catch (RuntimeException e) {
 				// expected
 			}
-			
+
 		}
 		finally {
 			csvOutputFormat.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
index 2f403aa..9280009 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
@@ -18,8 +18,12 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
+
 import org.junit.Test;
 
+/**
+ * Tests for {@link ExecutionEnvironment#fromElements}.
+ */
 public class FromElementsTest {
 
 	@Test
@@ -34,7 +38,7 @@ public class FromElementsTest {
 		executionEnvironment.fromElements(SubType.class, new SubType(1, "Java"), new ParentType(1, "hello"));
 	}
 
-	public static class ParentType {
+	private static class ParentType {
 		int num;
 		String string;
 		public ParentType(int num, String string) {
@@ -43,7 +47,7 @@ public class FromElementsTest {
 		}
 	}
 
-	public static class SubType extends ParentType{
+	private static class SubType extends ParentType{
 		public SubType(int num, String string) {
 			super(num, string);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
index f9dc28a..d90d657 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -18,26 +18,28 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 
-import org.junit.Test;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link PrimitiveInputFormat}.
+ */
 public class PrimitiveInputFormatTest {
 
 	private static final Path PATH = new Path("an/ignored/file/");
 
-
 	@Test
 	public void testStringInput() {
 		try {
@@ -71,15 +73,13 @@ public class PrimitiveInputFormatTest {
 		}
 	}
 
-
-
 	@Test
 	public void testIntegerInput() throws IOException {
 		try {
 			final String fileContent = "111|222|";
 			final FileInputSplit split = createInputSplit(fileContent);
 
-			final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+			final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH, "|", Integer.class);
 
 			format.configure(new Configuration());
 			format.open(split);
@@ -99,7 +99,6 @@ public class PrimitiveInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
 
 	@Test
 	public void testDoubleInputLinewise() throws IOException {
@@ -136,7 +135,7 @@ public class PrimitiveInputFormatTest {
 			String fileContent = first + "\r\n" + second + "\r\n";
 			final FileInputSplit split = createInputSplit(fileContent);
 
-			final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH ,String.class);
+			final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH, String.class);
 
 			format.configure(new Configuration());
 			format.open(split);
@@ -153,14 +152,14 @@ public class PrimitiveInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test(expected = IOException.class)
 	public void testFailingInput() throws IOException {
-		
+
 		final String fileContent = "111|222|asdf|17";
 		final FileInputSplit split = createInputSplit(fileContent);
 
-		final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+		final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH, "|", Integer.class);
 
 		format.configure(new Configuration());
 		format.open(split);

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index 943db36..f44d5bf 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -45,18 +45,21 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link RowCsvInputFormat}.
+ */
 public class RowCsvInputFormatTest {
 
-	private static Path PATH = new Path("an/ignored/file/");
+	private static final Path PATH = new Path("an/ignored/file/");
 
 	// static variables for testing the removal of \r\n to \n
-	private static String FIRST_PART = "That is the first part";
-	private static String SECOND_PART = "That is the second part";
+	private static final String FIRST_PART = "That is the first part";
+	private static final String SECOND_PART = "That is the second part";
 
 	@Test
 	public void ignoreInvalidLines() throws Exception {
@@ -588,7 +591,7 @@ public class RowCsvInputFormatTest {
 		RowCsvInputFormat format = new RowCsvInputFormat(
 			PATH,
 			fieldTypes,
-			new int[]{0,3,7});
+			new int[]{0, 3, 7});
 		format.setFieldDelimiter("|x|");
 		format.configure(new Configuration());
 		format.open(split);

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 6bff9db..e78232a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -16,15 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -34,51 +33,55 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link TextInputFormat}.
+ */
 public class TextInputFormatTest {
 	@Test
 	public void testSimpleRead() {
-		final String FIRST = "First line";
-		final String SECOND = "Second line";
-		
+		final String first = "First line";
+		final String second = "Second line";
+
 		try {
 			// create input file
 			File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
 			tempFile.deleteOnExit();
 			tempFile.setWritable(true);
-			
+
 			PrintStream ps = new  PrintStream(tempFile);
-			ps.println(FIRST);
-			ps.println(SECOND);
+			ps.println(first);
+			ps.println(second);
 			ps.close();
-			
+
 			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
-			
-			Configuration parameters = new Configuration(); 
+
+			Configuration parameters = new Configuration();
 			inputFormat.configure(parameters);
-			
+
 			FileInputSplit[] splits = inputFormat.createInputSplits(1);
 			assertTrue("expected at least one input split", splits.length >= 1);
-			
+
 			inputFormat.open(splits[0]);
-			
+
 			String result = "";
-			
+
 			assertFalse(inputFormat.reachedEnd());
 			result = inputFormat.nextRecord("");
 			assertNotNull("Expecting first record here", result);
-			assertEquals(FIRST, result);
-			
+			assertEquals(first, result);
+
 			assertFalse(inputFormat.reachedEnd());
 			result = inputFormat.nextRecord(result);
 			assertNotNull("Expecting second record here", result);
-			assertEquals(SECOND, result);
-			
+			assertEquals(second, result);
+
 			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
 		}
 		catch (Throwable t) {
@@ -142,70 +145,68 @@ public class TextInputFormatTest {
 	}
 
 	/**
-	 * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed
+	 * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed.
 	 */
 	@Test
 	public void testRemovingTrailingCR() {
-		
-		testRemovingTrailingCR("\n","\n");
-		testRemovingTrailingCR("\r\n","\n");
-		
-		testRemovingTrailingCR("|","|");
-		testRemovingTrailingCR("|","\n");
+
+		testRemovingTrailingCR("\n", "\n");
+		testRemovingTrailingCR("\r\n", "\n");
+
+		testRemovingTrailingCR("|", "|");
+		testRemovingTrailingCR("|", "\n");
 	}
-	
-	private void testRemovingTrailingCR(String lineBreaker,String delimiter) {
-		File tempFile=null;
-		
-		String FIRST = "First line";
-		String SECOND = "Second line";
-		String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker;
-		
+
+	private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
+		File tempFile = null;
+
+		String first = "First line";
+		String second = "Second line";
+		String content = first + lineBreaker + second + lineBreaker;
+
 		try {
 			// create input file
 			tempFile = File.createTempFile("TextInputFormatTest", "tmp");
 			tempFile.deleteOnExit();
 			tempFile.setWritable(true);
-			
+
 			OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
-			wrt.write(CONTENT);
+			wrt.write(content);
 			wrt.close();
-			
+
 			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
 			inputFormat.setFilePath(tempFile.toURI().toString());
-			
-			Configuration parameters = new Configuration(); 
+
+			Configuration parameters = new Configuration();
 			inputFormat.configure(parameters);
-			
+
 			inputFormat.setDelimiter(delimiter);
-			
+
 			FileInputSplit[] splits = inputFormat.createInputSplits(1);
-						
+
 			inputFormat.open(splits[0]);
-			
 
 			String result = "";
-			if (  (delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) ) 
-					|| (lineBreaker.equals(delimiter)) ){
-				
+			if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n")))
+					|| (lineBreaker.equals(delimiter))){
+
 				result = inputFormat.nextRecord("");
 				assertNotNull("Expecting first record here", result);
-				assertEquals(FIRST, result);
-				
+				assertEquals(first, result);
+
 				result = inputFormat.nextRecord(result);
 				assertNotNull("Expecting second record here", result);
-				assertEquals(SECOND, result);
-				
+				assertEquals(second, result);
+
 				result = inputFormat.nextRecord(result);
 				assertNull("The input file is over", result);
-				
+
 			} else {
 				result = inputFormat.nextRecord("");
 				assertNotNull("Expecting first record here", result);
-				assertEquals(CONTENT, result);
+				assertEquals(content, result);
 			}
-			
-			
+
 		}
 		catch (Throwable t) {
 			System.err.println("test failed with exception: " + t.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index a119d59..676a7c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -38,6 +39,9 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 
+/**
+ * Tests for type serialization format.
+ */
 @RunWith(Parameterized.class)
 public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> {
 
@@ -50,7 +54,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
 	public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int parallelism) {
 		super(numberOfTuples, blockSize, parallelism);
 
-        resultType = TypeExtractor.getForObject(getRecord(0));
+		resultType = TypeExtractor.getForObject(getRecord(0));
 
 		serializer = resultType.createSerializer(new ExecutionConfig());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
index 6b63bb1..d7e42e5 100644
--- a/tools/maven/suppressions-java.xml
+++ b/tools/maven/suppressions-java.xml
@@ -23,13 +23,6 @@ under the License.
 	"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
 
 <suppressions>
-	<suppress
-		files="(.*)api[/\\]java[/\\]io[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)api[/\\]java[/\\]io[/\\](.*)"
-		checks="UnusedImports|AvoidStarImport"/>
 
 	<suppress
 		files="(.*)api[/\\]java[/\\]aggregation[/\\](.*)"


[4/4] flink git commit: [FLINK-6732] [java] Activate strict checkstyle for flink-java

Posted by ch...@apache.org.
[FLINK-6732] [java] Activate strict checkstyle for flink-java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c42557f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c42557f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c42557f

Branch: refs/heads/master
Commit: 3c42557f3083182b0ab66d15cb6ec8452b59464c
Parents: 8e97536
Author: zentol <ch...@apache.org>
Authored: Mon Jul 31 17:03:48 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 17:05:39 2017 +0200

----------------------------------------------------------------------
 docs/internals/ide_setup.md       |  2 +-
 flink-java/pom.xml                | 29 --------------------
 tools/maven/suppressions-java.xml | 50 ----------------------------------
 3 files changed, 1 insertion(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c42557f/docs/internals/ide_setup.md
----------------------------------------------------------------------
diff --git a/docs/internals/ide_setup.md b/docs/internals/ide_setup.md
index c070331..6781f44 100644
--- a/docs/internals/ide_setup.md
+++ b/docs/internals/ide_setup.md
@@ -107,7 +107,7 @@ You can scan an entire module by opening the Checkstyle tools window and
 clicking the "Check Module" button. The scan should report no errors.
 
 <span class="label label-info">Note</span> Some modules are not fully covered by checkstyle,
-which include `flink-core`, `flink-java`, `flink-optimizer`, and `flink-runtime`.
+which include `flink-core`, `flink-optimizer`, and `flink-runtime`.
 Nevertheless please make sure that code you add/modify in these modules still conforms to the checkstyle rules.
 
 ## Eclipse

http://git-wip-us.apache.org/repos/asf/flink/blob/3c42557f/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 9d69ca8..2dc243e 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -85,35 +85,6 @@ under the License.
 
 	<build>
 		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-checkstyle-plugin</artifactId>
-				<version>2.17</version>
-				<dependencies>
-					<dependency>
-						<groupId>com.puppycrawl.tools</groupId>
-						<artifactId>checkstyle</artifactId>
-						<version>6.19</version>
-					</dependency>
-				</dependencies>
-				<executions>
-					<execution>
-						<id>validate</id>
-						<phase>validate</phase>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-
-				<configuration>
-					<configLocation>/tools/maven/checkstyle.xml</configLocation>
-					<suppressionsLocation>/tools/maven/suppressions-java.xml</suppressionsLocation>
-					<includeTestSourceDirectory>true</includeTestSourceDirectory>
-					<logViolationsToConsole>true</logViolationsToConsole>
-					<failOnViolation>true</failOnViolation>
-				</configuration>
-			</plugin>
 
 			<!-- activate API compatibility checks -->
 			<plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/3c42557f/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
deleted file mode 100644
index 3bb8556..0000000
--- a/tools/maven/suppressions-java.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!DOCTYPE suppressions PUBLIC
-	"-//Puppy Crawl//DTD Suppressions 1.1//EN"
-	"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
-
-<suppressions>
-
-	<suppress
-		files="(.*)api[/\\]java[/\\]aggregation[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
-	<suppress
-		files="(.*)api[/\\]java[/\\]operators[/\\]join[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
-	<suppress
-		files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
-	<suppress
-		files="(.*)test[/\\](.*)api[/\\]java[/\\]tuple[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
-	<suppress
-		files="(.*)test[/\\](.*)api[/\\]common[/\\]io[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
-	<suppress
-		files="(.*)test[/\\](.*)api[/\\]common[/\\]operators[/\\](.*)"
-		checks="AvoidStarImport|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-</suppressions>


[2/4] flink git commit: [FLINK-7185] Activate checkstyle flink-java/io

Posted by ch...@apache.org.
[FLINK-7185] Activate checkstyle flink-java/io

This closes #4340.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9c9fb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9c9fb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9c9fb5

Branch: refs/heads/master
Commit: 0c9c9fb5cb7a8a27d444db5c725c8abd792ca761
Parents: d578810
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:31:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 16:37:52 2017 +0200

----------------------------------------------------------------------
 .../api/java/io/CollectionInputFormat.java      |  40 ++--
 .../flink/api/java/io/CsvInputFormat.java       |  17 +-
 .../flink/api/java/io/CsvOutputFormat.java      |  26 +--
 .../org/apache/flink/api/java/io/CsvReader.java |  97 ++++----
 .../api/java/io/DiscardingOutputFormat.java     |   1 -
 .../flink/api/java/io/IteratorInputFormat.java  |  12 +-
 .../java/io/LocalCollectionOutputFormat.java    |  21 +-
 .../java/io/ParallelIteratorInputFormat.java    |  20 +-
 .../flink/api/java/io/PojoCsvInputFormat.java   |   5 +
 .../flink/api/java/io/PrimitiveInputFormat.java |   5 +-
 .../flink/api/java/io/PrintingOutputFormat.java |  30 +--
 .../flink/api/java/io/RowCsvInputFormat.java    |   3 +
 .../flink/api/java/io/SplitDataProperties.java  |  85 ++++---
 .../flink/api/java/io/TextInputFormat.java      |  50 ++--
 .../flink/api/java/io/TextOutputFormat.java     |  50 ++--
 .../flink/api/java/io/TextValueInputFormat.java |  57 ++---
 .../flink/api/java/io/TupleCsvInputFormat.java  |   9 +-
 .../api/java/io/TypeSerializerOutputFormat.java |   6 +-
 .../apache/flink/api/java/io/CSVReaderTest.java |  90 ++++----
 .../api/java/io/CollectionInputFormatTest.java  |  67 +++---
 .../flink/api/java/io/CsvInputFormatTest.java   | 230 ++++++++++---------
 .../flink/api/java/io/CsvOutputFormatTest.java  |   7 +-
 .../flink/api/java/io/FromElementsTest.java     |   8 +-
 .../api/java/io/PrimitiveInputFormatTest.java   |  35 ++-
 .../api/java/io/RowCsvInputFormatTest.java      |  15 +-
 .../flink/api/java/io/TextInputFormatTest.java  | 129 +++++------
 .../api/java/io/TypeSerializerFormatTest.java   |   6 +-
 tools/maven/suppressions-java.xml               |   7 -
 28 files changed, 587 insertions(+), 541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 90e6712..eebe56f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
@@ -34,6 +26,14 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * An input format that returns objects from a collection.
  */
@@ -55,7 +55,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		}
 
 		this.serializer = serializer;
-		
+
 		this.dataSet = dataSet;
 	}
 
@@ -67,10 +67,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 	@Override
 	public void open(GenericInputSplit split) throws IOException {
 		super.open(split);
-		
+
 		this.iterator = this.dataSet.iterator();
 	}
-	
+
 	@Override
 	public T nextRecord(T record) throws IOException {
 		return this.iterator.next();
@@ -80,10 +80,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		out.defaultWriteObject();
-		
+
 		final int size = dataSet.size();
 		out.writeInt(size);
-		
+
 		if (size > 0) {
 			DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
 			for (T element : dataSet){
@@ -97,7 +97,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
-		
+
 		if (collectionLength > 0) {
 			try {
 				DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
@@ -113,9 +113,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 		dataSet = list;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
@@ -136,14 +136,14 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		sb.append(']');
 		return sb.toString();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static <X> void checkCollection(Collection<X> elements, Class<X> viewedAs) {
 		if (elements == null || viewedAs == null) {
 			throw new NullPointerException();
 		}
-		
+
 		for (X elem : elements) {
 			if (elem == null) {
 				throw new IllegalArgumentException("The collection must not contain null elements.");
@@ -157,7 +157,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 			if (!viewedAs.isAssignableFrom(elem.getClass()) &&
 					!(elem.getClass().toString().equals("class scala.runtime.BoxedUnit") && viewedAs.equals(void.class))) {
 
-				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + 
+				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
 							viewedAs.getCanonicalName());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index f1a16ea..0bd4e69 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -21,13 +21,18 @@ package org.apache.flink.api.java.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import java.io.IOException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
 
+/**
+ * InputFormat that reads csv files.
+ *
+ * @param <OUT>
+ */
 @Internal
 public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
@@ -38,7 +43,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 	public static final String DEFAULT_FIELD_DELIMITER = ",";
 
 	protected transient Object[] parsedValues;
-	
+
 	protected CsvInputFormat(Path filePath) {
 		super(filePath);
 	}
@@ -63,7 +68,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
 		// left to right evaluation makes access [0] okay
 		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
-		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n') {
 			this.lineDelimiterIsLinebreak = true;
 		}
 
@@ -123,7 +128,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
 	protected static boolean[] createDefaultMask(int size) {
 		boolean[] includedMask = new boolean[size];
-		for (int x=0; x<includedMask.length; x++) {
+		for (int x = 0; x < includedMask.length; x++) {
 			includedMask[x] = true;
 		}
 		return includedMask;
@@ -154,5 +159,5 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 	public String toString() {
 		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index c2fe13c..44fd679 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -16,18 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +27,15 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
 /**
  * This is an OutputFormat to serialize {@link org.apache.flink.api.java.tuple.Tuple}s to text. The output is
  * structured by record delimiters and field delimiters as common in CSV files.
@@ -121,8 +122,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	/**
 	 * Configures the format to either allow null values (writing an empty field),
 	 * or to throw an exception when encountering a null field.
-	 * <p>
-	 * by default, null values are disallowed.
+	 *
+	 * <p>by default, null values are disallowed.
 	 *
 	 * @param allowNulls Flag to indicate whether the output format should accept null values.
 	 */
@@ -144,8 +145,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	 * Configures whether the output format should quote string values. String values are fields
 	 * of type {@link java.lang.String} and {@link org.apache.flink.types.StringValue}, as well as
 	 * all subclasses of the latter.
-	 * <p>
-	 * By default, strings are not quoted.
+	 *
+	 * <p>By default, strings are not quoted.
 	 *
 	 * @param quoteStrings Flag indicating whether string fields should be quoted.
 	 */
@@ -215,7 +216,6 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	}
 
     /**
-	 *
 	 * The purpose of this method is solely to check whether the data type to be processed
 	 * is in fact a tuple type.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ce2f4fa..684911a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -23,9 +23,9 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
 import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -44,14 +44,13 @@ import java.util.Arrays;
 public class CsvReader {
 
 	private final Path path;
-	
+
 	private final ExecutionEnvironment executionContext;
-	
-	
+
 	protected boolean[] includedMask;
-	
+
 	protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
-	
+
 	protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
 
 	protected String commentPrefix = null; //default: no comments
@@ -61,35 +60,35 @@ public class CsvReader {
 	protected char quoteCharacter = '"';
 
 	protected boolean skipFirstLineAsHeader = false;
-	
+
 	protected boolean ignoreInvalidLines = false;
 
 	private String charset = "UTF-8";
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 		Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
-		
+
 		this.path = filePath;
 		this.executionContext = executionContext;
 	}
-	
+
 	public CsvReader(String filePath, ExecutionEnvironment executionContext) {
 		this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
 	}
-	
+
 	public Path getFilePath() {
 		return this.path;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Configures the delimiter that separates the lines/rows. The linebreak character
 	 * ({@code '\n'}) is used by default.
-	 * 
+	 *
 	 * @param delimiter The delimiter that separates the rows.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -97,15 +96,15 @@ public class CsvReader {
 		if (delimiter == null || delimiter.length() == 0) {
 			throw new IllegalArgumentException("The delimiter must not be null or an empty string");
 		}
-		
+
 		this.lineDelimiter = delimiter;
 		return this;
 	}
-	
+
 	/**
 	 * Configures the delimiter that separates the fields within a row. The comma character
 	 * ({@code ','}) is used by default.
-	 * 
+	 *
 	 * @param delimiter The delimiter that separates the fields in one row.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 *
@@ -148,7 +147,7 @@ public class CsvReader {
 	 * Configures the string that starts comments.
 	 * By default comments will be treated as invalid lines.
 	 * This function only recognizes comments which start at the beginning of the line!
-	 * 
+	 *
 	 * @param commentPrefix The string that starts the comments.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -156,7 +155,7 @@ public class CsvReader {
 		if (commentPrefix == null || commentPrefix.length() == 0) {
 			throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
 		}
-		
+
 		this.commentPrefix = commentPrefix;
 		return this;
 	}
@@ -172,7 +171,7 @@ public class CsvReader {
 	}
 
 	/**
-	 * Sets the charset of the reader
+	 * Sets the charset of the reader.
 	 *
 	 * @param charset The character set to set.
 	 */
@@ -189,7 +188,7 @@ public class CsvReader {
 	 * the boolean array is {@code true}.
 	 * The number of fields in the result is consequently equal to the number of times that {@code true}
 	 * occurs in the fields array.
-	 * 
+	 *
 	 * @param fields The array of flags that describes which fields are to be included and which not.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -197,14 +196,14 @@ public class CsvReader {
 		if (fields == null || fields.length == 0) {
 			throw new IllegalArgumentException("The set of included fields must not be null or empty.");
 		}
-		
+
 		int lastTruePos = -1;
 		for (int i = 0; i < fields.length; i++) {
 			if (fields[i]) {
 				lastTruePos = i;
 			}
 		}
-		
+
 		if (lastTruePos == -1) {
 			throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
 		}
@@ -225,13 +224,13 @@ public class CsvReader {
 	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
 	 * {@code false}). The result contains the fields where the corresponding position in
 	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
-	 * 
+	 *
 	 * @param mask The string mask defining which fields to include and which to skip.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
 	public CsvReader includeFields(String mask) {
 		boolean[] includedMask = new boolean[mask.length()];
-		
+
 		for (int i = 0; i < mask.length(); i++) {
 			char c = mask.charAt(i);
 			if (c == '1' || c == 'T' || c == 't') {
@@ -240,10 +239,10 @@ public class CsvReader {
 				throw new IllegalArgumentException("Mask string may contain only '0' and '1'.");
 			}
 		}
-		
+
 		return includeFields(includedMask);
 	}
-	
+
 	/**
 	 * Configures which fields of the CSV file should be included and which should be skipped. The
 	 * bits in the value (read from least significant to most significant) define whether the field at
@@ -252,14 +251,14 @@ public class CsvReader {
 	 * non-zero bit.
 	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
 	 * include the fields where the corresponding bit is one.
-	 * <p>
-	 * Examples:
+	 *
+	 * <p>Examples:
 	 * <ul>
 	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
 	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
 	 *       two and three, skip fields four and five, and include field six.</li>
 	 * </ul>
-	 * 
+	 *
 	 * @param mask The bit mask defining which fields to include and which to skip.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -267,36 +266,36 @@ public class CsvReader {
 		if (mask == 0) {
 			throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
 		}
-		
+
 		ArrayList<Boolean> fields = new ArrayList<Boolean>();
 
 		while (mask != 0) {
 			fields.add((mask & 0x1L) != 0);
 			mask >>>= 1;
 		}
-		
+
 		boolean[] fieldsArray = new boolean[fields.size()];
 		for (int i = 0; i < fieldsArray.length; i++) {
 			fieldsArray[i] = fields.get(i);
 		}
-		
+
 		return includeFields(fieldsArray);
 	}
 
 	/**
 	 * Sets the CSV reader to ignore the first line. This is useful for files that contain a header line.
-	 * 
+	 *
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
 	public CsvReader ignoreFirstLine() {
 		skipFirstLineAsHeader = true;
 		return this;
 	}
-	
+
 	/**
-	 * Sets the CSV reader to ignore any invalid lines. 
+	 * Sets the CSV reader to ignore any invalid lines.
 	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
-	 * 
+	 *
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
 	public CsvReader ignoreInvalidLines(){
@@ -325,12 +324,12 @@ public class CsvReader {
 
 		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
 	 * {@link Tuple}. The type information for the fields is obtained from the type class. The type
 	 * consequently needs to specify all generic field types of the tuple.
-	 * 
+	 *
 	 * @param targetType The class of the target type, needs to be a subclass of Tuple.
 	 * @return The DataSet representing the parsed CSV data.
 	 */
@@ -339,24 +338,24 @@ public class CsvReader {
 		if (!Tuple.class.isAssignableFrom(targetType)) {
 			throw new IllegalArgumentException("The target type must be a subclass of " + Tuple.class.getName());
 		}
-		
+
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
 		CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask);
-		
+
 		Class<?>[] classes = new Class<?>[typeInfo.getArity()];
 		for (int i = 0; i < typeInfo.getArity(); i++) {
 			classes[i] = typeInfo.getTypeAt(i).getTypeClass();
 		}
-		
+
 		configureInputFormat(inputFormat);
 		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Miscellaneous
 	// --------------------------------------------------------------------------------------------
-	
+
 	private void configureInputFormat(CsvInputFormat<?> format) {
 		format.setCharset(this.charset);
 		format.setDelimiter(this.lineDelimiter);
@@ -368,10 +367,10 @@ public class CsvReader {
 			format.enableQuotedStringParsing(this.quoteCharacter);
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------	
+
+	// --------------------------------------------------------------------------------------------
 	// The following lines are generated.
-	// --------------------------------------------------------------------------------------------	
+	// --------------------------------------------------------------------------------------------
 	// BEGIN_OF_TUPLE_DEPENDENT_CODE
 	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
index f01d864..7358b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Public;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
index cb8bd6a..05f3ccd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
-import java.io.Serializable;
-import java.util.Iterator;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
 
+import java.io.Serializable;
+import java.util.Iterator;
+
 /**
  * An input format that returns objects from an iterator.
  */
@@ -35,13 +34,12 @@ public class IteratorInputFormat<T> extends GenericInputFormat<T> implements Non
 	private static final long serialVersionUID = 1L;
 
 	private Iterator<T> iterator; // input data as serializable iterator
-	
-	
+
 	public IteratorInputFormat(Iterator<T> iterator) {
 		if (!(iterator instanceof Serializable)) {
 			throw new IllegalArgumentException("The data source iterator must be serializable.");
 		}
-		
+
 		this.iterator = iterator;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
index 65ed6c3..bcb1cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.api.java.io;
 
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.RichOutputFormat;
@@ -33,15 +26,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
- *  An output format that writes record into collection
+ *  An output format that adds records to a collection.
  */
 @PublicEvolving
 public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implements InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
 
-	private static Map<Integer,Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
+	private static final Map<Integer, Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
 
 	private transient ArrayList<T> taskResult;
 
@@ -67,7 +66,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
 	@Override
 	public void configure(Configuration parameters) {}
 
-
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
 		this.taskResult = new ArrayList<T>();
@@ -80,7 +78,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
 		this.taskResult.add(recordCopy);
 	}
 
-
 	@Override
 	public void close() throws IOException {
 		synchronized (RESULT_HOLDER) {
@@ -93,6 +90,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
 	@Override
 	@SuppressWarnings("unchecked")
 	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		this.typeSerializer = (TypeSerializer<T>)type.createSerializer(executionConfig);
+		this.typeSerializer = (TypeSerializer<T>) type.createSerializer(executionConfig);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
index a6ac853..25e25b4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.util.SplittableIterator;
 
+import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * An input format that generates data in parallel through a {@link SplittableIterator}.
@@ -34,25 +33,22 @@ import org.apache.flink.util.SplittableIterator;
 public class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> {
 
 	private static final long serialVersionUID = 1L;
-	
-	
+
 	private final SplittableIterator<T> source;
-	
+
 	private transient Iterator<T> splitIterator;
-	
-	
-	
+
 	public ParallelIteratorInputFormat(SplittableIterator<T> iterator) {
 		this.source = iterator;
 	}
-	
+
 	@Override
 	public void open(GenericInputSplit split) throws IOException {
 		super.open(split);
-		
+
 		this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits());
 	}
-	
+
 	@Override
 	public boolean reachedEnd() {
 		return !this.splitIterator.hasNext();

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
index 990e9e6..804d02b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Internal;
@@ -29,6 +30,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Input format that reads csv into POJOs.
+ * @param <OUT> resulting POJO type
+ */
 @Internal
 public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index d454765..794703b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -44,7 +44,6 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 
 	private transient FieldParser<OT> parser;
 
-
 	public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
 		super(filePath, null);
 		this.primitiveClass = primitiveClass;
@@ -70,7 +69,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 	public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
 		// Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
 		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
-			&& offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+			&& offset + numBytes >= 1 && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
 			numBytes -= 1;
 		}
 
@@ -79,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 			return parser.getLastResult();
 		} else {
 			String s = new String(bytes, offset, numBytes, getCharset());
-			throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
+			throw new IOException("Could not parse value: \"" + s + "\" as type " + primitiveClass.getSimpleName());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index a010fd8..0ab1abb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.PrintStream;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.PrintStream;
+
+/**
+ * Output format that prints results into either stdout or stderr.
+ * @param <T>
+ */
 @PublicEvolving
 public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 
@@ -37,19 +41,19 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 	private boolean target;
 
 	private transient PrintStream stream;
-	
+
 	private transient String prefix;
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Instantiates a printing output format that prints to standard out.
 	 */
 	public PrintingOutputFormat() {}
-	
+
 	/**
 	 * Instantiates a printing output format that prints to standard out.
-	 * 
+	 *
 	 * @param stdErr True, if the format should print to standard error instead of standard out.
 	 */
 	public PrintingOutputFormat(boolean stdErr) {
@@ -65,20 +69,18 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 		this(stdErr);
 		this.sinkIdentifier = sinkIdentifier;
 	}
-	
+
 	public void setTargetToStandardOut() {
 		this.target = STD_OUT;
 	}
-	
+
 	public void setTargetToStandardErr() {
 		this.target = STD_ERR;
 	}
-	
-	
+
 	@Override
 	public void configure(Configuration parameters) {}
 
-
 	@Override
 	public void open(int taskNumber, int numTasks) {
 		// get the target stream
@@ -116,9 +118,9 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 		this.prefix = null;
 		this.sinkIdentifier = null;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index b752966..15ef90e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -29,6 +29,9 @@ import org.apache.flink.types.parser.FieldParser;
 
 import java.util.Arrays;
 
+/**
+ * Input format that reads csv into {@link Row}.
+ */
 @PublicEvolving
 public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultTypeQueryable<Row> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index db09380..c1487f6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.common.operators.Keys;
 
 import java.util.Arrays;
 
@@ -34,7 +34,7 @@ import java.util.Arrays;
  * SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit}
  * generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}.
  *
- * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
+ * <p>InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
  * SplitDataProperties can define that the elements which are generated by the associated InputFormat
  * are
  * <ul>
@@ -46,7 +46,7 @@ import java.util.Arrays;
  *    are in the defined order.</li>
  * </ul>
  *
- * <b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
+ * <p><b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
  * data reorganization steps such as shuffling or sorting can be avoided.
  * HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong!</b>
  *
@@ -90,8 +90,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	/**
 	 * Defines that data is partitioned across input splits on the fields defined by field positions.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -106,8 +106,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that data is partitioned using a specific partitioning method
 	 * across input splits on the fields defined by field positions.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -137,8 +137,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that data is partitioned across input splits on the fields defined by field expressions.
  	 * Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -154,8 +154,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * across input splits on the fields defined by field expressions.
 	 * Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -165,7 +165,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, String partitionFields) {
 
-		if(partitionFields == null) {
+		if (partitionFields == null) {
 			throw new InvalidProgramException("PartitionFields may not be null.");
 		}
 
@@ -175,7 +175,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 		}
 
 		this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
-		if(partitionMethodId != null) {
+		if (partitionMethodId != null) {
 			this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId);
 		}
 		else {
@@ -189,8 +189,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that the data within an input split is grouped on the fields defined by the field positions.
 	 * All records sharing the same key (combination) must be subsequently emitted by the input
 	 * format for each input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -199,13 +199,13 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsGroupedBy(int... groupFields) {
 
-		if(groupFields == null) {
+		if (groupFields == null) {
 			throw new InvalidProgramException("GroupFields may not be null.");
 		} else if (groupFields.length == 0) {
 			throw new InvalidProgramException("GroupFields may not be empty.");
 		}
 
-		if(this.splitOrdering != null) {
+		if (this.splitOrdering != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
@@ -219,8 +219,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records sharing the same key (combination) must be subsequently emitted by the input
 	 * format for each input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -229,7 +229,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsGroupedBy(String groupFields) {
 
-		if(groupFields == null) {
+		if (groupFields == null) {
 			throw new InvalidProgramException("GroupFields may not be null.");
 		}
 
@@ -238,7 +238,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 			throw new InvalidProgramException("GroupFields may not be empty.");
 		}
 
-		if(this.splitOrdering != null) {
+		if (this.splitOrdering != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
@@ -251,8 +251,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that the data within an input split is sorted on the fields defined by the field positions
 	 * in the specified orders.
 	 * All records of an input split must be emitted by the input format in the defined order.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -262,7 +262,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, Order[] orders) {
 
-		if(orderFields == null || orders == null) {
+		if (orderFields == null || orders == null) {
 			throw new InvalidProgramException("OrderFields or Orders may not be null.");
 		} else if (orderFields.length == 0) {
 			throw new InvalidProgramException("OrderFields may not be empty.");
@@ -272,17 +272,17 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 			throw new InvalidProgramException("Number of OrderFields and Orders must match.");
 		}
 
-		if(this.splitGroupKeys != null) {
+		if (this.splitGroupKeys != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
 		this.splitOrdering = new Ordering();
 
-		for(int i=0; i<orderFields.length; i++) {
+		for (int i = 0; i < orderFields.length; i++) {
 			int pos = orderFields[i];
 			int[] flatKeys = this.getAllFlatKeys(new int[]{pos});
 
-			for(int key : flatKeys) {
+			for (int key : flatKeys) {
 				// check for duplicates
 				for (int okey : splitOrdering.getFieldPositions()) {
 					if (key == okey) {
@@ -290,7 +290,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 					}
 				}
 				// append key
-				this.splitOrdering.appendOrdering(key, null, orders[i] );
+				this.splitOrdering.appendOrdering(key, null, orders[i]);
 			}
 		}
 		return this;
@@ -300,8 +300,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that the data within an input split is sorted on the fields defined by the field expressions
 	 * in the specified orders. Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records of an input split must be emitted by the input format in the defined order.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -311,7 +311,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsOrderedBy(String orderFields, Order[] orders) {
 
-		if(orderFields == null || orders == null) {
+		if (orderFields == null || orders == null) {
 			throw new InvalidProgramException("OrderFields or Orders may not be null.");
 		}
 
@@ -324,18 +324,18 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 			throw new InvalidProgramException("Number of OrderFields and Orders must match.");
 		}
 
-		if(this.splitGroupKeys != null) {
+		if (this.splitGroupKeys != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
 		this.splitOrdering = new Ordering();
 
-		for(int i=0; i<orderKeysA.length; i++) {
+		for (int i = 0; i < orderKeysA.length; i++) {
 			String keyExp = orderKeysA[i];
 			Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
 			int[] flatKeys = ek.computeLogicalKeyPositions();
 
-			for(int key : flatKeys) {
+			for (int key : flatKeys) {
 				// check for duplicates
 				for (int okey : splitOrdering.getFieldPositions()) {
 					if (key == okey) {
@@ -343,7 +343,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 					}
 				}
 				// append key
-				this.splitOrdering.appendOrdering(key, null, orders[i] );
+				this.splitOrdering.appendOrdering(key, null, orders[i]);
 			}
 		}
 		return this;
@@ -365,25 +365,24 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 		return this.splitOrdering;
 	}
 
-
 	/////////////////////// FLAT FIELD EXTRACTION METHODS
 
 	private int[] getAllFlatKeys(String[] fieldExpressions) {
 
 		int[] allKeys = null;
 
-		for(String keyExp : fieldExpressions) {
+		for (String keyExp : fieldExpressions) {
 			Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
 			int[] flatKeys = ek.computeLogicalKeyPositions();
 
-			if(allKeys == null) {
+			if (allKeys == null) {
 				allKeys = flatKeys;
 			} else {
 				// check for duplicates
-				for(int key1 : flatKeys) {
-					for(int key2 : allKeys) {
-						if(key1 == key2) {
-							throw new InvalidProgramException("Duplicate fields in field expression "+keyExp);
+				for (int key1 : flatKeys) {
+					for (int key2 : allKeys) {
+						if (key1 == key2) {
+							throw new InvalidProgramException("Duplicate fields in field expression " + keyExp);
 						}
 					}
 				}
@@ -425,7 +424,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 
 		@Override
 		public boolean equals(Object o) {
-			if(o instanceof SourcePartitionerMarker) {
+			if (o instanceof SourcePartitionerMarker) {
 				return this.partitionMarker.equals(((SourcePartitionerMarker<?>) o).partitionMarker);
 			} else {
 				return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index b2554bf..7d4cf9c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -18,61 +18,63 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
-import java.nio.charset.Charset;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Input Format that reads text files. Each line results in another element.
+ */
 @PublicEvolving
 public class TextInputFormat extends DelimitedInputFormat<String> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	/**
-	 * Code of \r, used to remove \r from a line when the line ends with \r\n
+	 * Code of \r, used to remove \r from a line when the line ends with \r\n.
 	 */
 	private static final byte CARRIAGE_RETURN = (byte) '\r';
 
 	/**
-	 * Code of \n, used to identify if \n is used as delimiter
+	 * Code of \n, used to identify if \n is used as delimiter.
 	 */
 	private static final byte NEW_LINE = (byte) '\n';
-	
-	
+
 	/**
 	 * The name of the charset to use for decoding.
 	 */
 	private String charsetName = "UTF-8";
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public TextInputFormat(Path filePath) {
 		super(filePath, null);
 	}
-	
-	// --------------------------------------------------------------------------------------------	
-	
+
+	// --------------------------------------------------------------------------------------------
+
 	public String getCharsetName() {
 		return charsetName;
 	}
-	
+
 	public void setCharsetName(String charsetName) {
 		if (charsetName == null) {
 			throw new IllegalArgumentException("Charset must not be null.");
 		}
-		
+
 		this.charsetName = charsetName;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
-		
+
 		if (charsetName == null || !Charset.isSupported(charsetName)) {
 			throw new RuntimeException("Unsupported charset: " + charsetName);
 		}
@@ -83,17 +85,17 @@ public class TextInputFormat extends DelimitedInputFormat<String> {
 	@Override
 	public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException {
 		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
-		if (this.getDelimiter() != null && this.getDelimiter().length == 1 
-				&& this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1 
-				&& bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+		if (this.getDelimiter() != null && this.getDelimiter().length == 1
+				&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
+				&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
 			numBytes -= 1;
 		}
-		
+
 		return new String(bytes, offset, numBytes, this.charsetName);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "TextInputFormat (" + getFilePath() + ") - " + this.charsetName;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
index d466082..006b571 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
@@ -18,65 +18,75 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
+/**
+ * A {@link FileOutputFormat} that writes objects to a text file.
+ *
+ * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
+ * @param <T> type of elements
+ */
 @PublicEvolving
 public class TextOutputFormat<T> extends FileOutputFormat<T> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private static final int NEWLINE = '\n';
 
 	private String charsetName;
-	
+
 	private transient Charset charset;
 
 	// --------------------------------------------------------------------------------------------
 
-	public static interface TextFormatter<IN> extends Serializable {
-		public String format(IN value);
+
+	/**
+	 * Formatter that transforms values into their {@link String} representations.
+	 * @param <IN> type of input elements
+	 */
+	public interface TextFormatter<IN> extends Serializable {
+		String format(IN value);
 	}
 
 	public TextOutputFormat(Path outputPath) {
 		this(outputPath, "UTF-8");
 	}
-	
+
 	public TextOutputFormat(Path outputPath, String charset) {
 		super(outputPath);
 		this.charsetName = charset;
 	}
-	
-	
+
 	public String getCharsetName() {
 		return charsetName;
 	}
-	
+
 	public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
 		if (charsetName == null) {
 			throw new NullPointerException();
 		}
-		
+
 		if (!Charset.isSupported(charsetName)) {
 			throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported.");
 		}
-		
+
 		this.charsetName = charsetName;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
 		super.open(taskNumber, numTasks);
-		
+
 		try {
 			this.charset = Charset.forName(charsetName);
 		}
@@ -87,16 +97,16 @@ public class TextOutputFormat<T> extends FileOutputFormat<T> {
 			throw new IOException("The charset " + charsetName + " is not supported.", e);
 		}
 	}
-	
+
 	@Override
 	public void writeRecord(T record) throws IOException {
 		byte[] bytes = record.toString().getBytes(charset);
 		this.stream.write(bytes);
 		this.stream.write(NEWLINE);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index 45a2e3e..4721439 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -26,61 +32,58 @@ import java.nio.charset.CharsetDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.StringValue;
-
+/**
+ * Input format that reads text files.
+ */
 @PublicEvolving
 public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private String charsetName = "UTF-8";
-	
+
 	private boolean skipInvalidLines;
-	
+
 	private transient CharsetDecoder decoder;
-	
+
 	private transient ByteBuffer byteWrapper;
-	
+
 	private transient boolean ascii;
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public TextValueInputFormat(Path filePath) {
 		super(filePath, null);
 	}
-	
-	// --------------------------------------------------------------------------------------------	
-	
+
+	// --------------------------------------------------------------------------------------------
+
 	public String getCharsetName() {
 		return charsetName;
 	}
-	
+
 	public void setCharsetName(String charsetName) {
 		if (charsetName == null) {
 			throw new IllegalArgumentException("The charset name may not be null.");
 		}
-		
+
 		this.charsetName = charsetName;
 	}
-	
+
 	public boolean isSkipInvalidLines() {
 		return skipInvalidLines;
 	}
-	
+
 	public void setSkipInvalidLines(boolean skipInvalidLines) {
 		this.skipInvalidLines = skipInvalidLines;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
-		
+
 		if (charsetName == null || !Charset.isSupported(charsetName)) {
 			throw new RuntimeException("Unsupported charset: " + charsetName);
 		}
@@ -88,7 +91,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 		if (charsetName.equalsIgnoreCase(StandardCharsets.US_ASCII.name())) {
 			ascii = true;
 		}
-		
+
 		this.decoder = Charset.forName(charsetName).newDecoder();
 		this.byteWrapper = ByteBuffer.allocate(1);
 	}
@@ -109,7 +112,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 			}
 			byteWrapper.limit(offset + numBytes);
 			byteWrapper.position(offset);
-				
+
 			try {
 				CharBuffer result = this.decoder.decode(byteWrapper);
 				reuse.setValue(result);
@@ -126,9 +129,9 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "TextValueInputFormat (" + getFilePath() + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : "");

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
index 6efd566..887620a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -15,15 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.core.fs.Path;
 
+/**
+ * Input format that reads csv into tuples.
+ */
 @Internal
 public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
 
@@ -59,7 +62,7 @@ public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
 		super(filePath);
 		configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
 	}
-	
+
 	private void configure(String lineDelimiter, String fieldDelimiter,
 			TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
index 81a142e..108448d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
@@ -36,16 +36,16 @@ import java.io.IOException;
 public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> implements InputTypeConfigurable {
 
 	private static final long serialVersionUID = -6653022644629315158L;
-	
+
 	private TypeSerializer<T> serializer;
 
 	@Override
 	protected void serialize(T record, DataOutputView dataOutput) throws IOException {
-		if(serializer == null){
+		if (serializer == null){
 			throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to " +
 					"be defined.");
 		}
-		
+
 		serializer.serialize(record, dataOutput);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index de57e5c..5622930 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,7 +60,7 @@ public class CSVReaderTest {
 		reader.ignoreFirstLine();
 		Assert.assertTrue(reader.skipFirstLineAsHeader);
 	}
-	
+
 	@Test
 	public void testIgnoreInvalidLinesConfigure() {
 		CsvReader reader = getCsvReader();
@@ -67,7 +68,7 @@ public class CSVReaderTest {
 		reader.ignoreInvalidLines();
 		Assert.assertTrue(reader.ignoreInvalidLines);
 	}
-	
+
 	@Test
 	public void testIgnoreComments() {
 		CsvReader reader = getCsvReader();
@@ -89,38 +90,38 @@ public class CSVReaderTest {
 		CsvReader reader = getCsvReader();
 		reader.includeFields(true, true, true);
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("ttt");
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("TTT");
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("111");
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields(0x7L);
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
 	}
-	
+
 	@Test
 	public void testIncludeFieldsSparse() {
 		CsvReader reader = getCsvReader();
 		reader.includeFields(false, true, true, false, false, true, false, false);
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("fttfftff");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("FTTFFTFF");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("01100100");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
@@ -128,16 +129,16 @@ public class CSVReaderTest {
 		reader = getCsvReader();
 		reader.includeFields("0t1f0TFF");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields(0x26L);
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
 	}
-	
+
 	@Test
 	public void testIllegalCharInStringMask() {
 		CsvReader reader = getCsvReader();
-		
+
 		try {
 			reader.includeFields("1t0Tfht");
 			Assert.fail("Reader accepted an invalid mask string");
@@ -146,12 +147,11 @@ public class CSVReaderTest {
 			// expected
 		}
 	}
-	
-	
+
 	@Test
 	public void testIncludeFieldsErrorWhenExcludingAll() {
 		CsvReader reader = getCsvReader();
-		
+
 		try {
 			reader.includeFields(false, false, false, false, false, false);
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -159,7 +159,7 @@ public class CSVReaderTest {
 		catch (IllegalArgumentException e) {
 			// all good
 		}
-		
+
 		try {
 			reader.includeFields(0);
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -167,7 +167,7 @@ public class CSVReaderTest {
 		catch (IllegalArgumentException e) {
 			// all good
 		}
-		
+
 		try {
 			reader.includeFields("ffffffffffffff");
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -175,7 +175,7 @@ public class CSVReaderTest {
 		catch (IllegalArgumentException e) {
 			// all good
 		}
-		
+
 		try {
 			reader.includeFields("00000000000000000");
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -191,12 +191,12 @@ public class CSVReaderTest {
 		DataSource<Item> items = reader.tupleType(Item.class);
 		Assert.assertTrue(items.getType().getTypeClass() == Item.class);
 	}
-	
+
 	@Test
 	public void testFieldTypes() throws Exception {
 		CsvReader reader = getCsvReader();
 		DataSource<Item> items = reader.tupleType(Item.class);
-		
+
 		TypeInformation<?> info = items.getType();
 		if (!info.isTupleType()) {
 			Assert.fail();
@@ -208,44 +208,44 @@ public class CSVReaderTest {
 			Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
 
 		}
-		
+
 		CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) items.getInputFormat();
 		Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
 	}
-	
+
 	@Test
 	public void testSubClass() throws Exception {
 		CsvReader reader = getCsvReader();
 		DataSource<SubItem> sitems = reader.tupleType(SubItem.class);
 		TypeInformation<?> info = sitems.getType();
-		
+
 		Assert.assertEquals(true, info.isTupleType());
 		Assert.assertEquals(SubItem.class, info.getTypeClass());
-		
+
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-		
+
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
 		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
-		
+
 		CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
 		Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
 	}
-	
+
 	@Test
 	public void testSubClassWithPartialsInHierarchie() throws Exception {
 		CsvReader reader = getCsvReader();
 		DataSource<FinalItem> sitems = reader.tupleType(FinalItem.class);
 		TypeInformation<?> info = sitems.getType();
-		
+
 		Assert.assertEquals(true, info.isTupleType());
 		Assert.assertEquals(FinalItem.class, info.getTypeClass());
-		
+
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-		
+
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
 		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
@@ -253,15 +253,15 @@ public class CSVReaderTest {
 		Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(4).getClass());
 		Assert.assertEquals(StringValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(3)).getTypeClass());
 		Assert.assertEquals(LongValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(4)).getTypeClass());
-		
+
 		CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
 		Assert.assertArrayEquals(new Class<?>[] {Integer.class, String.class, Double.class, StringValue.class, LongValue.class}, inputFormat.getFieldTypes());
 	}
-	
+
 	@Test
 	public void testUnsupportedPartialitem() throws Exception {
 		CsvReader reader = getCsvReader();
-		
+
 		try {
 			reader.tupleType(PartialItem.class);
 			Assert.fail("tupleType() accepted an underspecified generic class.");
@@ -295,32 +295,32 @@ public class CSVReaderTest {
 		// CsvReader doesn't support custom Value type
 		reader.types(ValueItem.class);
 	}
-	
+
 	private static CsvReader getCsvReader() {
 		return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Custom types for testing
 	// --------------------------------------------------------------------------------------------
-	
-	public static class Item extends Tuple4<Integer, String, Double, String> {
+
+	private static class Item extends Tuple4<Integer, String, Double, String> {
 		private static final long serialVersionUID = -7444437337392053502L;
 	}
-	
-	public static class SubItem extends Item {
+
+	private static class SubItem extends Item {
 		private static final long serialVersionUID = 1L;
 	}
-	
-	public static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
+
+	private static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
 		private static final long serialVersionUID = 1L;
 	}
-	
-	public static class FinalItem extends PartialItem<String, StringValue, LongValue> {
+
+	private static class FinalItem extends PartialItem<String, StringValue, LongValue> {
 		private static final long serialVersionUID = 1L;
 	}
 
-	public static class ValueItem implements Value {
+	private static class ValueItem implements Value {
 		private int v1;
 
 		public int getV1() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 4dabaca..77945cc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -33,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -47,9 +43,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionInputFormat}.
+ */
 public class CollectionInputFormatTest {
-	
-	public static class ElementType {
+
+	private static class ElementType {
 		private final int id;
 
 		public ElementType(){
@@ -73,7 +77,7 @@ public class CollectionInputFormatTest {
 				return false;
 			}
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return id;
@@ -90,7 +94,8 @@ public class CollectionInputFormatTest {
 	@Test
 	public void testSerializability() {
 		try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-			 ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+			ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+
 			Collection<ElementType> inputCollection = new ArrayList<ElementType>();
 			ElementType element1 = new ElementType(1);
 			ElementType element2 = new ElementType(2);
@@ -98,10 +103,10 @@ public class CollectionInputFormatTest {
 			inputCollection.add(element1);
 			inputCollection.add(element2);
 			inputCollection.add(element3);
-	
+
 			@SuppressWarnings("unchecked")
 			TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-	
+
 			CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
 					info.createSerializer(new ExecutionConfig()));
 
@@ -121,23 +126,23 @@ public class CollectionInputFormatTest {
 			inputFormat.open(inputSplit);
 			result.open(inputSplit);
 
-			while(!inputFormat.reachedEnd() && !result.reachedEnd()){
+			while (!inputFormat.reachedEnd() && !result.reachedEnd()){
 				ElementType expectedElement = inputFormat.nextRecord(null);
 				ElementType actualElement = result.nextRecord(null);
 
 				assertEquals(expectedElement, actualElement);
 			}
 		}
-		catch(Exception e) {
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.toString());
 		}
 
 	}
-	
+
 	@Test
 	public void testSerializabilityStrings() {
-		
+
 		final String[] data = new String[] {
 				"To be, or not to be,--that is the question:--",
 				"Whether 'tis nobler in the mind to suffer",
@@ -175,33 +180,33 @@ public class CollectionInputFormatTest {
 				"The fair Ophelia!--Nymph, in thy orisons",
 				"Be all my sins remember'd."
 		};
-		
+
 		try {
 			List<String> inputCollection = Arrays.asList(data);
 			CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
-			
+
 			// serialize
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
 			ObjectOutputStream oos = new ObjectOutputStream(baos);
 			oos.writeObject(inputFormat);
 			oos.close();
-			
+
 			// deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 			ObjectInputStream ois = new ObjectInputStream(bais);
 			Object result = ois.readObject();
-			
+
 			assertTrue(result instanceof CollectionInputFormat);
-			
+
 			int i = 0;
 			@SuppressWarnings("unchecked")
 			CollectionInputFormat<String> in = (CollectionInputFormat<String>) result;
 			in.open(new GenericInputSplit(0, 1));
-			
+
 			while (!in.reachedEnd()) {
 				assertEquals(data[i++], in.nextRecord(""));
 			}
-			
+
 			assertEquals(data.length, i);
 		}
 		catch (Exception e) {
@@ -209,7 +214,7 @@ public class CollectionInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializationFailure() {
 		try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
@@ -217,7 +222,7 @@ public class CollectionInputFormatTest {
 			// a mock serializer that fails when writing
 			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
 					Collections.singleton(new ElementType()), new TestSerializer(false, true));
-			
+
 			try {
 				out.writeObject(inFormat);
 				fail("should throw an exception");
@@ -234,21 +239,21 @@ public class CollectionInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testDeserializationFailure() {
 		try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-			 ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+			ObjectOutputStream out = new ObjectOutputStream(buffer)) {
 			// a mock serializer that fails when writing
 			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
 					Collections.singleton(new ElementType()), new TestSerializer(true, false));
 
 			out.writeObject(inFormat);
 			out.close();
-			
+
 			ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
 			ObjectInputStream in = new ObjectInputStream(bais);
-			
+
 			try {
 				in.readObject();
 				fail("should throw an exception");
@@ -296,14 +301,14 @@ public class CollectionInputFormatTest {
 	private static class TestException extends IOException{
 		private static final long serialVersionUID = 1L;
 	}
-	
+
 	private static class TestSerializer extends TypeSerializer<ElementType> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private final boolean failOnRead;
 		private final boolean failOnWrite;
-		
+
 		public TestSerializer(boolean failOnRead, boolean failOnWrite) {
 			this.failOnRead = failOnRead;
 			this.failOnWrite = failOnWrite;


[3/4] flink git commit: [FLINK-7191] Activate checkstyle flink-java/operators/translation

Posted by ch...@apache.org.
[FLINK-7191] Activate checkstyle flink-java/operators/translation

This closes #4334.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e975362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e975362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e975362

Branch: refs/heads/master
Commit: 8e975362312c727fd602429778bc1c3628b95619
Parents: 0c9c9fb
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:42:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 17:04:26 2017 +0200

----------------------------------------------------------------------
 .../CombineToGroupCombineWrapper.java           |   1 +
 .../translation/KeyExtractingMapper.java        |  23 +--
 .../translation/KeyRemovingMapper.java          |   9 +-
 .../PlanBothUnwrappingCoGroupOperator.java      |  22 +--
 .../translation/PlanFilterOperator.java         |  17 +-
 .../PlanLeftUnwrappingCoGroupOperator.java      |  15 +-
 .../translation/PlanProjectOperator.java        |  25 +--
 .../PlanRightUnwrappingCoGroupOperator.java     |  11 +-
 .../PlanUnwrappingGroupCombineOperator.java     |  29 ++--
 .../PlanUnwrappingReduceGroupOperator.java      |  47 +++---
 .../PlanUnwrappingReduceOperator.java           |  12 +-
 ...lanUnwrappingSortedGroupCombineOperator.java |  15 +-
 ...PlanUnwrappingSortedReduceGroupOperator.java |  23 ++-
 .../RichCombineToGroupCombineWrapper.java       |   3 +-
 .../translation/Tuple3UnwrappingIterator.java   |   4 +-
 .../translation/Tuple3WrappingCollector.java    |   3 +-
 .../translation/TupleLeftUnwrappingJoiner.java  |   8 +
 .../translation/TupleRightUnwrappingJoiner.java |   8 +
 .../translation/TupleUnwrappingIterator.java    |  14 +-
 .../translation/TupleUnwrappingJoiner.java      |   8 +
 .../translation/TupleWrappingCollector.java     |  13 +-
 .../translation/TwoKeyExtractingMapper.java     |  10 +-
 .../operators/translation/WrappingFunction.java |  13 +-
 .../translation/AggregateTranslationTest.java   |  35 ++--
 .../translation/CoGroupSortTranslationTest.java |  53 +++---
 .../DeltaIterationTranslationTest.java          | 160 ++++++++++---------
 .../translation/DistinctTranslationTest.java    |  15 +-
 .../translation/ReduceTranslationTests.java     |  98 ++++++------
 tools/maven/suppressions-java.xml               |   8 -
 29 files changed, 375 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
index 408d4b3..f574218 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index f35b950..1f0cc1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,34 +19,37 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+/**
+ * Mapper that extracts keys.
+ * @param <T> type of value
+ * @param <K> type of key
+ */
 @Internal
 @ForwardedFields("*->1")
 public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private final KeySelector<T, K> keySelector;
-	
+
 	private final Tuple2<K, T> tuple = new Tuple2<K, T>();
-	
-	
+
 	public KeyExtractingMapper(KeySelector<T, K> keySelector) {
 		this.keySelector = keySelector;
 	}
-	
-	
+
 	@Override
 	public Tuple2<K, T> map(T value) throws Exception {
-		
+
 		K key = keySelector.getKey(value);
 		tuple.f0 = key;
 		tuple.f1 = value;
-		
+
 		return tuple;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 5f0de32..920f893 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -23,12 +23,17 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+/**
+ * Mapper that removes keys.
+ * @param <T> type of values
+ * @param <K> type of keys
+ */
 @Internal
 @ForwardedFields("1->*")
 public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	public T map(Tuple2<K, T> value) {
 		return value.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
index 1814329..6ccf0f3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * A co group operator that applies the operation only on the unwrapped values.
+ */
 @Internal
 public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
-		extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
-{
+		extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> {
 
 	public PlanBothUnwrappingCoGroupOperator(
 			CoGroupFunction<I1, I2, OUT> udf,
@@ -52,23 +54,21 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				name);
 	}
 
-	public static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
+	private static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
 			extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
-			implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
-	{
+			implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private final TupleUnwrappingIterator<I1, K> iter1;
 		private final TupleUnwrappingIterator<I2, K> iter2;
-		
+
 		private TupleBothUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
 			super(wrapped);
-			
+
 			this.iter1 = new TupleUnwrappingIterator<I1, K>();
 			this.iter2 = new TupleUnwrappingIterator<I2, K>();
 		}
 
-
 		@Override
 		public void coGroup(
 				Iterable<Tuple2<K, I1>> records1,
@@ -79,6 +79,6 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
 			iter2.set(records2.iterator());
 			this.wrappedFunction.coGroup(iter1, iter2, out);
 		}
-		
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index c93191f..ecf1aac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -27,26 +27,33 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.util.Collector;
 
+/**
+ * @see FilterOperatorBase
+ * @param <T>
+ */
 @Internal
 @ForwardedFields("*")
 public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {
-	
+
 	public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
 		super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
 	}
 
+	/**
+	 * @see FlatMapFunction
+	 * @param <T>
+	 */
 	public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
-		implements FlatMapFunction<T, T>
-	{
+		implements FlatMapFunction<T, T> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private FlatMapFilter(FilterFunction<T> wrapped) {
 			super(wrapped);
 		}
 
 		@Override
-		public final void flatMap(T value, Collector<T> out) throws Exception {
+		public void flatMap(T value, Collector<T> out) throws Exception {
 			if (this.wrappedFunction.filter(value)) {
 				out.collect(value);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
index 78840ce..b2a6937 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the left.
+ */
 @Internal
 public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
-		extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>>
-{
+		extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>> {
 
 	public PlanLeftUnwrappingCoGroupOperator(
 			CoGroupFunction<I1, I2, OUT> udf,
@@ -52,21 +54,20 @@ public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				name);
 	}
 
-	public static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
+	private static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
 			extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
 			implements CoGroupFunction<Tuple2<K, I1>, I2, OUT> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private final TupleUnwrappingIterator<I1, K> iter1;
 
 		private TupleLeftUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
 			super(wrapped);
-			
+
 			this.iter1 = new TupleUnwrappingIterator<I1, K>();
 		}
 
-
 		@Override
 		public void coGroup(
 				Iterable<Tuple2<K, I1>> records1,

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index fe981a5..3960807 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -27,30 +27,33 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 
+/**
+ * A map operator that retains a subset of fields from incoming tuples.
+ *
+ * @param <T> Input tuple type
+ * @param <R> Output tuple type
+ */
 @Internal
 public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
 
 	public PlanProjectOperator(int[] fields, String name,
 								TypeInformation<T> inType, TypeInformation<R> outType,
-								ExecutionConfig executionConfig)
-	{
+								ExecutionConfig executionConfig) {
 		super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
 		return (MapFunction<T, R>) new MapProjector<X, R>(fields);
 	}
-	
-	
-	public static final class MapProjector<T extends Tuple, R extends Tuple> 
-			extends AbstractRichFunction implements MapFunction<T, R>
-	{
+
+	private static final class MapProjector<T extends Tuple, R extends Tuple>
+			extends AbstractRichFunction implements MapFunction<T, R> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private final int[] fields;
 		private final Tuple outTuple;
-		
+
 		private MapProjector(int[] fields) {
 			this.fields = fields;
 			try {
@@ -69,7 +72,7 @@ public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T,
 			for (int i = 0; i < fields.length; i++) {
 				outTuple.setField(inTuple.getField(fields[i]), i);
 			}
-			
+
 			return (R) outTuple;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index faeca4e..f86deb7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the right.
+ */
 @Internal
 public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
-		extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>>
-{
+		extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>> {
 
 	public PlanRightUnwrappingCoGroupOperator(
 			CoGroupFunction<I1, I2, OUT> udf,
@@ -52,7 +54,7 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				name);
 	}
 
-	public static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
+	private static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
 			extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
 			implements CoGroupFunction<I1, Tuple2<K, I2>, OUT> {
 
@@ -66,7 +68,6 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
 			this.iter2 = new TupleUnwrappingIterator<I2, K>();
 		}
 
-
 		@Override
 		public void coGroup(
 				Iterable<I1> records1,

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index e9feb61..6e6f226 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
@@ -35,35 +35,32 @@ import org.apache.flink.util.Collector;
 public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {
 
 	public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
-												TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
-	{
+												TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey) {
 		super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
 				new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-		
+
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
-		implements GroupCombineFunction<Tuple2<K, IN>, OUT>
-	{
-	
+
+	private static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+		implements GroupCombineFunction<Tuple2<K, IN>, OUT> {
+
 		private static final long serialVersionUID = 1L;
-		
-		private final TupleUnwrappingIterator<IN, K> iter; 
-		
+
+		private final TupleUnwrappingIterator<IN, K> iter;
+
 		private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<IN, K>();
 		}
-	
-	
+
 		@Override
 		public void combine(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
 			this.wrappedFunction.combine(iter, out);
 		}
-		
+
 		@Override
 		public String toString() {
 			return this.wrappedFunction.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 8568659..33c527d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  * on the unwrapped values.
  */
 @Internal
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>, OUT>> {
 
 	public PlanUnwrappingReduceGroupOperator(
 		GroupReduceFunction<IN, OUT> udf,
@@ -41,32 +41,30 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 		String name,
 		TypeInformation<OUT> outType,
 		TypeInformation<Tuple2<K, IN>> typeInfoWithKey,
-		boolean combinable)
-	{
+		boolean combinable) {
 		super(
 			combinable ?
 				new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>(udf) :
 				new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
 			new UnaryOperatorInformation<>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-		
+
 		super.setCombinable(combinable);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
-	{
+
+	private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private TupleUnwrappingIterator<IN, K> iter;
 		private TupleWrappingCollector<IN, K> coll;
 
 		private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 
-			if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+			if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
 				throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
 			}
 
@@ -74,7 +72,6 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 			this.coll = new TupleWrappingCollector<>(this.iter);
 		}
 
-
 		@Override
 		public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
@@ -87,35 +84,33 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 
 			iter.set(values.iterator());
 			coll.set(out);
-			((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+			((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
 		}
-		
+
 		@Override
 		public String toString() {
 			return this.wrappedFunction.toString();
 		}
 	}
-	
-	public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple2<K, IN>, OUT>
-	{
-	
+
+	private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT> {
+
 		private static final long serialVersionUID = 1L;
-		
-		private final TupleUnwrappingIterator<IN, K> iter; 
-		
+
+		private final TupleUnwrappingIterator<IN, K> iter;
+
 		private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<>();
 		}
-	
-	
+
 		@Override
 		public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
 			this.wrappedFunction.reduce(iter, out);
 		}
-		
+
 		@Override
 		public String toString() {
 			return this.wrappedFunction.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 72dc41a..b2e614e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-
 /**
  * A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only
  * on the unwrapped values.
@@ -35,16 +34,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
 
 	public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name,
-			TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey)
-	{
+			TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey) {
 		super(new ReduceWrapper<T, K>(udf), new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey), key.computeLogicalKeyPositions(), name);
 	}
 
-	public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
-		implements ReduceFunction<Tuple2<K, T>>
-	{
+	private static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
+		implements ReduceFunction<Tuple2<K, T>> {
 		private static final long serialVersionUID = 1L;
-		
 
 		private ReduceWrapper(ReduceFunction<T> wrapped) {
 			super(wrapped);

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index f65f169..a2a9010 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
@@ -32,21 +32,19 @@ import org.apache.flink.util.Collector;
  * operation only on the unwrapped values.
  */
 @Internal
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>> {
 
 	public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
-													TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
-	{
+													TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey) {
 		super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
 				new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType),
-				groupingKey.computeLogicalKeyPositions(), 
+				groupingKey.computeLogicalKeyPositions(),
 				name);
 
 	}
 
-	public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
-			implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
-	{
+	private static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+			implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -57,7 +55,6 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G
 			this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
 		}
 
-
 		@Override
 		public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 8080477..7f81fee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  * operation only on the unwrapped values.
  */
 @Internal
-public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>> {
 
 	public PlanUnwrappingSortedReduceGroupOperator(
 		GroupReduceFunction<IN, OUT> udf,
@@ -42,8 +42,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		String name,
 		TypeInformation<OUT> outType,
 		TypeInformation<Tuple3<K1, K2, IN>>
-		typeInfoWithKey, boolean combinable)
-	{
+		typeInfoWithKey, boolean combinable) {
 		super(
 			combinable ?
 				new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) :
@@ -55,9 +54,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 
 	// --------------------------------------------------------------------------------------------
 
-	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
-	{
+	private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -67,7 +65,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 
-			if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+			if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
 				throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
 			}
 
@@ -75,7 +73,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 			this.coll = new Tuple3WrappingCollector<>(this.iter);
 		}
 
-
 		@Override
 		public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());
@@ -87,7 +84,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception {
 			iter.set(values.iterator());
 			coll.set(out);
-			((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+			((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
 		}
 
 		@Override
@@ -96,9 +93,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		}
 	}
 
-	public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>
-	{
+	private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -109,7 +105,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 			this.iter = new Tuple3UnwrappingIterator<>();
 		}
 
-
 		@Override
 		public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
 			iter.set(values.iterator());

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index d8c54d6..3f6463a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.api.common.functions.CombineFunction;
@@ -31,7 +32,7 @@ import org.apache.flink.util.Preconditions;
  * and makes it look like a function that implements {@link GroupCombineFunction} and {@link GroupReduceFunction} to the runtime.
  */
 public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduceFunction<IN, OUT> & CombineFunction<IN, IN>>
-	extends RichGroupCombineFunction<IN,IN> implements GroupReduceFunction<IN, OUT> {
+	extends RichGroupCombineFunction<IN, IN> implements GroupReduceFunction<IN, OUT> {
 
 	private final F wrappedFunction;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
index fd3b4f6..b697ac9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.TraversableOnceException;
 
+import java.util.Iterator;
+
 /**
  * An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (third field).
  * The iterator also tracks the groupKeys, as the triples flow though it.

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
index 189dcdb..57b6bc7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
 /**
- * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting
+ * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting.
  */
 @Internal
 public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.io.Serializable {
@@ -35,7 +35,6 @@ public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.
 
 	private Collector<Tuple3<K1, K2, IN>> wrappedCollector;
 
-
 	public Tuple3WrappingCollector(Tuple3UnwrappingIterator<IN, K1, K2> tui) {
 		this.tui = tui;
 		this.outTuple = new Tuple3<K1, K2, IN>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
index 2ff73ef..e39ec47 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * Joiner that unwraps values from the left set before applying the join operation.
+ *
+ * @param <I1>  type of values in the left set
+ * @param <I2>  type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K>   type of key
+ */
 @Internal
 public final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
 		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
index c9b9c27..847acd7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * Joiner that unwraps values from the right set before applying the join operation.
+ *
+ * @param <I1>  type of values in the left set
+ * @param <I2>  type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K>   type of key
+ */
 @Internal
 public final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
 		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 5dbe266..16ebef8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.TraversableOnceException;
 
+import java.util.Iterator;
+
 /**
  * An iterator that reads 2-tuples (key value pairs) and returns only the values (second field).
  * The iterator also tracks the keys, as the pairs flow though it.
@@ -32,16 +32,16 @@ import org.apache.flink.util.TraversableOnceException;
 public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
-	
-	private K lastKey; 
+
+	private K lastKey;
 	private Iterator<Tuple2<K, T>> iterator;
 	private boolean iteratorAvailable;
-	
+
 	public void set(Iterator<Tuple2<K, T>> iterator) {
 		this.iterator = iterator;
 		this.iteratorAvailable = true;
 	}
-	
+
 	public K getLastKey() {
 		return lastKey;
 	}
@@ -53,7 +53,7 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>,
 
 	@Override
 	public T next() {
-		Tuple2<K, T> t = iterator.next(); 
+		Tuple2<K, T> t = iterator.next();
 		this.lastKey = t.f0;
 		return t.f1;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
index e0ee3b3..1e56dac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * Joiner that unwraps both values before applying the join operation.
+ *
+ * @param <I1>  type of values in the left set
+ * @param <I2>  type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K>   type of key
+ */
 @Internal
 public final class TupleUnwrappingJoiner<I1, I2, OUT, K>
 		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
index 4581bf2..7369804 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
@@ -23,28 +23,27 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
 /**
- * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function
+ * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function.
  */
 @Internal
 public class TupleWrappingCollector<IN, K> implements Collector<IN>, java.io.Serializable {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	private final TupleUnwrappingIterator<IN, K> tui;
 	private final Tuple2<K, IN> outTuple;
-	
+
 	private Collector<Tuple2<K, IN>> wrappedCollector;
-	
-	
+
 	public TupleWrappingCollector(TupleUnwrappingIterator<IN, K> tui) {
 		this.tui = tui;
 		this.outTuple = new Tuple2<K, IN>();
 	}
-	
+
 	public void set(Collector<Tuple2<K, IN>> wrappedCollector) {
 			this.wrappedCollector = wrappedCollector;
 	}
-		
+
 	@Override
 	public void close() {
 		this.wrappedCollector.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 7d5e39b..9449237 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -19,11 +19,17 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 
+/**
+ * Mapper that extracts two keys of a value.
+ * @param <T> type of the values
+ * @param <K1> type of the first key
+ * @param <K2> type of the second key
+ */
 @Internal
 @ForwardedFields("*->2")
 public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
@@ -36,13 +42,11 @@ public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T,
 
 	private final Tuple3<K1, K2, T> tuple = new Tuple3<K1, K2, T>();
 
-
 	public TwoKeyExtractingMapper(KeySelector<T, K1> keySelector1, KeySelector<T, K2> keySelector2) {
 		this.keySelector1 = keySelector1;
 		this.keySelector2 = keySelector2;
 	}
 
-
 	@Override
 	public Tuple3<K1, K2, T> map(T value) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 9851c42..6c52ace 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -25,9 +25,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 
+/**
+ * Wrapper around {@link Function}.
+ * @param <T>
+ */
 @Internal
 public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	protected T wrappedFunction;
@@ -36,21 +40,20 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 		this.wrappedFunction = wrappedFunction;
 	}
 
-	
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		FunctionUtils.openFunction(this.wrappedFunction, parameters);
 	}
-	
+
 	@Override
 	public void close() throws Exception {
 		FunctionUtils.closeFunction(this.wrappedFunction);
 	}
-	
+
 	@Override
 	public void setRuntimeContext(RuntimeContext t) {
 		super.setRuntimeContext(t);
-		
+
 		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 0ce79e3..2f74288 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -16,25 +16,28 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of aggregations.
+ */
 public class AggregateTranslationTest {
 
 	@Test
@@ -42,26 +45,26 @@ public class AggregateTranslationTest {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Double, StringValue, Long>> initialData = 
+			DataSet<Tuple3<Double, StringValue, Long>> initialData =
 					env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
-			
+
 			initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
+
 			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-			
+
 			// check keys
 			assertEquals(1, reducer.getKeyColumns(0).length);
 			assertEquals(0, reducer.getKeyColumns(0)[0]);
-			
+
 			assertEquals(-1, reducer.getParallelism());
 			assertTrue(reducer.isCombinable());
-			
+
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
index 887173d..9c67e60 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -31,8 +29,16 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of co-group sort.
+ */
 @SuppressWarnings({"serial", "unchecked"})
 public class CoGroupSortTranslationTest implements java.io.Serializable {
 
@@ -40,35 +46,35 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 	public void testGroupSortTuples() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-			
+
 			input1.coGroup(input2)
 				.where(1).equalTo(2)
 				.sortFirstGroup(0, Order.DESCENDING)
 				.sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
-				
+
 				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>, Long>() {
 					@Override
 					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple3<Long, Long, Long>> second,
 							Collector<Long> out) {}
 				})
-				
+
 				.output(new DiscardingOutputFormat<Long>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 			CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-			
+
 			assertNotNull(coGroup.getGroupOrderForInputOne());
 			assertNotNull(coGroup.getGroupOrderForInputTwo());
-			
+
 			assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
 			assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
 			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-			
+
 			assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
 			assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
 			assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -80,39 +86,39 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSortTuplesAndPojos() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 			DataSet<TestPoJo> input2 = env.fromElements(new TestPoJo());
-			
+
 			input1.coGroup(input2)
 				.where(1).equalTo("b")
 				.sortFirstGroup(0, Order.DESCENDING)
 				.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-				
+
 				.with(new CoGroupFunction<Tuple2<Long, Long>, TestPoJo, Long>() {
 					@Override
 					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
 				})
-				
+
 				.output(new DiscardingOutputFormat<Long>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 			CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-			
+
 			assertNotNull(coGroup.getGroupOrderForInputOne());
 			assertNotNull(coGroup.getGroupOrderForInputTwo());
-			
+
 			assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
 			assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
 			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-			
+
 			assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
 			assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
 			assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -124,7 +130,10 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Sample test pojo.
+	 */
 	public static class TestPoJo {
 		public long a;
 		public long b;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index fd60bc6..e4cb8c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -16,93 +16,95 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Iterator;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
+import java.util.Iterator;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of delta iterations.
+ */
 @SuppressWarnings("serial")
 public class DeltaIterationTranslationTest implements java.io.Serializable {
 
 	@Test
 	public void testCorrectTranslation() {
 		try {
-			final String JOB_NAME = "Test JobName";
-			final String ITERATION_NAME = "Test Name";
-			
-			final String BEFORE_NEXT_WORKSET_MAP = "Some Mapper";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final int[] ITERATION_KEYS = new int[] {2};
-			final int NUM_ITERATIONS = 13;
-			
-			final int DEFAULT_parallelism= 133;
-			final int ITERATION_parallelism = 77;
-			
+			final String jobName = "Test JobName";
+			final String iterationName = "Test Name";
+
+			final String beforeNextWorksetMap = "Some Mapper";
+
+			final String aggregatorName = "AggregatorName";
+
+			final int[] iterationKeys = new int[] {2};
+			final int numIterations = 13;
+
+			final int defaultParallelism = 133;
+			final int iterationParallelism = 77;
+
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			// ------------ construct the test program ------------------
 			{
-				env.setParallelism(DEFAULT_parallelism);
-				
+				env.setParallelism(defaultParallelism);
+
 				@SuppressWarnings("unchecked")
 				DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
-	
+
 				@SuppressWarnings("unchecked")
 				DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-				
-				DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS);
-				iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism);
-				
-				iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
+
+				DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, numIterations, iterationKeys);
+				iteration.name(iterationName).parallelism(iterationParallelism);
+
+				iteration.registerAggregator(aggregatorName, new LongSumAggregator());
+
 				// test that multiple workset consumers are supported
-				DataSet<Tuple2<Double, String>> worksetSelfJoin = 
+				DataSet<Tuple2<Double, String>> worksetSelfJoin =
 					iteration.getWorkset()
-						.map(new IdentityMapper<Tuple2<Double,String>>())
+						.map(new IdentityMapper<Tuple2<Double, String>>())
 						.join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1);
-				
+
 				DataSet<Tuple3<Double, Long, String>> joined = worksetSelfJoin.join(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetJoin());
 
 				DataSet<Tuple3<Double, Long, String>> result = iteration.closeWith(
 						joined,
-						joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP));
-				
+						joined.map(new NextWorksetMapper()).name(beforeNextWorksetMap));
+
 				result.output(new DiscardingOutputFormat<Tuple3<Double, Long, String>>());
 				result.writeAsText("/dev/null");
 			}
-			
-			
-			Plan p = env.createProgramPlan(JOB_NAME);
-			
+
+			Plan p = env.createProgramPlan(jobName);
+
 			// ------------- validate the plan ----------------
-			assertEquals(JOB_NAME, p.getJobName());
-			assertEquals(DEFAULT_parallelism, p.getDefaultParallelism());
-			
+			assertEquals(jobName, p.getJobName());
+			assertEquals(defaultParallelism, p.getDefaultParallelism());
+
 			// validate the iteration
 			GenericDataSinkBase<?> sink1, sink2;
 			{
@@ -110,23 +112,23 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 				sink1 = sinks.next();
 				sink2 = sinks.next();
 			}
-			
+
 			DeltaIterationBase<?, ?> iteration = (DeltaIterationBase<?, ?>) sink1.getInput();
-			
+
 			// check that multi consumer translation works for iterations
 			assertEquals(iteration, sink2.getInput());
-			
+
 			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations());
-			assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
+			assertEquals(numIterations, iteration.getMaximumNumberOfIterations());
+			assertArrayEquals(iterationKeys, iteration.getSolutionSetKeyFields());
+			assertEquals(iterationParallelism, iteration.getParallelism());
+			assertEquals(iterationName, iteration.getName());
+
 			MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
 			InnerJoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
 			InnerJoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
 			MapOperatorBase<?, ?, ?> worksetMapper = (MapOperatorBase<?, ?, ?>) worksetSelfJoin.getFirstInput();
-			
+
 			assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
 			assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
 			if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
@@ -137,9 +139,9 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 				assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
 			}
 
-			assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			assertEquals(beforeNextWorksetMap, nextWorksetMapper.getName());
+
+			assertEquals(aggregatorName, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -147,20 +149,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testRejectWhenSolutionSetKeysDontMatchJoin() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
 
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-			
+
 			DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-			
+
 			try {
 				iteration.getWorkset().join(iteration.getSolutionSet()).where(1).equalTo(2);
 				fail("Accepted invalid program.");
@@ -168,7 +170,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			catch (InvalidProgramException e) {
 				// all good!
 			}
-			
+
 			try {
 				iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1);
 				fail("Accepted invalid program.");
@@ -183,20 +185,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testRejectWhenSolutionSetKeysDontMatchCoGroup() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
 
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-			
+
 			DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-			
+
 			try {
 				iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetCoGroup1());
 				fail("Accepted invalid program.");
@@ -204,7 +206,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			catch (InvalidProgramException e) {
 				// all good!
 			}
-			
+
 			try {
 				iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new SolutionWorksetCoGroup2());
 				fail("Accepted invalid program.");
@@ -219,40 +221,40 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+	private static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 		@Override
 		public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){
 			return null;
 		}
 	}
-	
-	public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+
+	private static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
 		@Override
 		public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) {
 			return null;
 		}
 	}
-	
-	public static class IdentityMapper<T> extends RichMapFunction<T, T> {
+
+	private static class IdentityMapper<T> extends RichMapFunction<T, T> {
 
 		@Override
 		public T map(T value) throws Exception {
 			return value;
 		}
 	}
-	
-	public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+	private static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 
 		@Override
 		public void coGroup(Iterable<Tuple2<Double, String>> first, Iterable<Tuple3<Double, Long, String>> second,
 				Collector<Tuple3<Double, Long, String>> out) {
 		}
 	}
-	
-	public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+
+	private static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
 
 		@Override
 		public void coGroup(Iterable<Tuple3<Double, Long, String>> second, Iterable<Tuple2<Double, String>> first,

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 27c7b2f..6a98a8d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Test;
 
 import java.io.Serializable;
@@ -45,6 +46,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for translation of distinct operation.
+ */
 @SuppressWarnings("serial")
 public class DistinctTranslationTest {
 
@@ -164,7 +168,7 @@ public class DistinctTranslationTest {
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
-			initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+			initialData.distinct(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
 				public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
 					return value.f1;
 				}
@@ -183,7 +187,7 @@ public class DistinctTranslationTest {
 			assertEquals(4, reducer.getParallelism());
 
 			// check types
-			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
 					new ValueTypeInfo<StringValue>(StringValue.class),
 					initialData.getType());
 
@@ -245,7 +249,7 @@ public class DistinctTranslationTest {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
 		return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
 				.setParallelism(1);
 	}
@@ -256,6 +260,9 @@ public class DistinctTranslationTest {
 		return env.fromCollection(data);
 	}
 
+	/**
+	 * Custom data type, for testing purposes.
+	 */
 	public static class CustomType implements Serializable {
 
 		private static final long serialVersionUID = 1L;
@@ -269,7 +276,7 @@ public class DistinctTranslationTest {
 
 		@Override
 		public String toString() {
-			return ""+myInt;
+			return "" + myInt;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 3adbbb8..486cad4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators.translation;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -37,10 +36,17 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of reduce operation.
+ */
 @SuppressWarnings("serial")
 public class ReduceTranslationTests implements java.io.Serializable {
 
@@ -49,31 +55,31 @@ public class ReduceTranslationTests implements java.io.Serializable {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-			
-			initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+
+			initialData.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
 				public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 					return value1;
 				}
 			}).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
+
 			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-			
+
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-			
+
 			// check keys
 			assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
-			
+
 			// parallelism was not configured on the operator
 			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-			
+
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {
@@ -82,40 +88,40 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			fail("Test caused an error: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void translateGroupedReduceNoMapper() {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-			
+
 			initialData
 				.groupBy(2)
-				.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+				.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
 					public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 						return value1;
 					}
 				})
 				.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
+
 			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-			
+
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-			
+
 			// parallelism was not configured on the operator
 			assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-			
+
 			// check keys
 			assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));
-			
+
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {
@@ -124,60 +130,58 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			fail("Test caused an error: " + e.getMessage());
 		}
 	}
-	
-	
+
 	@Test
 	public void translateGroupedReduceWithkeyExtractor() {
 		try {
 			final int parallelism = 8;
 			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-			
+
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-			
+
 			initialData
-				.groupBy(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+				.groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
 					public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
 						return value.f1;
 					}
 				})
-				.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+				.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
 					public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 						return value1;
 					}
 				}).setParallelism(4)
 				.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-			
+
 			Plan p = env.createProgramPlan();
-			
+
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-			
-			
+
 			MapOperatorBase<?, ?, ?> keyProjector = (MapOperatorBase<?, ?, ?>) sink.getInput();
 			PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyProjector.getInput();
 			MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
-			
+
 			// check the parallelisms
 			assertEquals(1, keyExtractor.getParallelism());
 			assertEquals(4, reducer.getParallelism());
 			assertEquals(4, keyProjector.getParallelism());
-			
+
 			// check types
-			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
 					new ValueTypeInfo<StringValue>(StringValue.class),
 					initialData.getType());
-			
+
 			assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType());
 			assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
-			
+
 			assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
 			assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType());
-			
+
 			assertEquals(keyValueInfo, keyProjector.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), keyProjector.getOperatorInfo().getOutputType());
-			
+
 			// check keys
 			assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
-			
+
 			assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
 		catch (Exception e) {
@@ -186,9 +190,9 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			fail("Test caused an error: " + e.getMessage());
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
-	private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
 		return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
 				.setParallelism(1);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
index d7e42e5..3bb8556 100644
--- a/tools/maven/suppressions-java.xml
+++ b/tools/maven/suppressions-java.xml
@@ -33,14 +33,6 @@ under the License.
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 
 	<suppress
-		files="(.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
-		checks="AvoidStarImport|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
-		checks="AvoidStarImport"/>
-
-	<suppress
 		files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>