You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:45 UTC

[15/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-hadoop-compatibility

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
index 13d971c..b1135f0 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,6 +39,12 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * IT case for the {@link HadoopReduceCombineFunction}.
+ */
 @RunWith(Parameterized.class)
 public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase {
 
@@ -68,7 +72,7 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		counts.writeAsText(resultPath);
 		env.execute();
 
-		String expected = "(0,5)\n"+
+		String expected = "(0,5)\n" +
 				"(1,6)\n" +
 				"(2,6)\n" +
 				"(3,4)\n";
@@ -115,7 +119,7 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		counts.writeAsText(resultPath);
 		env.execute();
 
-		String expected = "(0,5)\n"+
+		String expected = "(0,5)\n" +
 				"(1,6)\n" +
 				"(2,5)\n" +
 				"(3,5)\n";
@@ -144,7 +148,7 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		env.execute();
 
 		// return expected result
-		String expected = "(0,0)\n"+
+		String expected = "(0,0)\n" +
 				"(1,0)\n" +
 				"(2,1)\n" +
 				"(3,1)\n" +
@@ -152,62 +156,71 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 
 		compareResultsByLinesInMemory(expected, resultPath);
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts.
+	 */
 	public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
 
 		@Override
 		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
 				throws IOException {
-			
+
 			int sum = 0;
-			while(v.hasNext()) {
+			while (v.hasNext()) {
 				sum += v.next().get();
 			}
 			out.collect(k, new IntWritable(sum));
 		}
-		
+
 		@Override
 		public void configure(JobConf arg0) { }
 
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts that modifies the key.
+	 */
 	public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
 
 		@Override
 		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
 				throws IOException {
-			while(v.hasNext()) {
+			while (v.hasNext()) {
 				out.collect(new IntWritable(k.get() % 4), v.next());
 			}
 		}
-		
+
 		@Override
 		public void configure(JobConf arg0) { }
 
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts for a specific prefix.
+	 */
 	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
 		private String countPrefix;
-		
+
 		@Override
 		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
 				throws IOException {
 			int commentCnt = 0;
-			while(vs.hasNext()) {
+			while (vs.hasNext()) {
 				String v = vs.next().toString();
-				if(v.startsWith(this.countPrefix)) {
+				if (v.startsWith(this.countPrefix)) {
 					commentCnt++;
 				}
 			}
 			out.collect(k, new IntWritable(commentCnt));
 		}
-		
+
 		@Override
-		public void configure(final JobConf c) { 
+		public void configure(final JobConf c) {
 			this.countPrefix = c.get("my.cntPrefix");
 		}
 
@@ -215,10 +228,13 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		public void close() throws IOException { }
 	}
 
+	/**
+	 * Test mapper.
+	 */
 	public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
 			IntWritable>> {
 		private static final long serialVersionUID = 1L;
-		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+		Tuple2<IntWritable, IntWritable> outT = new Tuple2<IntWritable, IntWritable>();
 		@Override
 		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
 		throws Exception {
@@ -228,10 +244,13 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		}
 	}
 
+	/**
+	 * Test mapper.
+	 */
 	public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
 			IntWritable>> {
 		private static final long serialVersionUID = 1L;
-		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+		Tuple2<IntWritable, IntWritable> outT = new Tuple2<IntWritable, IntWritable>();
 		@Override
 		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
 		throws Exception {
@@ -241,9 +260,12 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		}
 	}
 
+	/**
+	 * Test mapper.
+	 */
 	public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
 		private static final long serialVersionUID = 1L;
-		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+		Tuple2<IntWritable, IntWritable> outT = new Tuple2<IntWritable, IntWritable>();
 		@Override
 		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
 		throws Exception {
@@ -253,6 +275,9 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
 		}
 	}
 
+	/**
+	 * Test mapper.
+	 */
 	public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
 		private static final long serialVersionUID = 1L;
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
index abc0e9c..3a22af0 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -18,15 +18,13 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -39,6 +37,12 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * IT cases for the {@link HadoopReduceFunction}.
+ */
 @RunWith(Parameterized.class)
 public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 
@@ -65,7 +69,7 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 		commentCnts.writeAsText(resultPath);
 		env.execute();
 
-		String expected = "(0,0)\n"+
+		String expected = "(0,0)\n" +
 				"(1,3)\n" +
 				"(2,5)\n" +
 				"(3,5)\n" +
@@ -113,7 +117,7 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 		helloCnts.writeAsText(resultPath);
 		env.execute();
 
-		String expected = "(0,0)\n"+
+		String expected = "(0,0)\n" +
 				"(1,0)\n" +
 				"(2,1)\n" +
 				"(3,1)\n" +
@@ -121,69 +125,78 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 
 		compareResultsByLinesInMemory(expected, resultPath);
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts.
+	 */
 	public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-		
+
 		@Override
 		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
 				throws IOException {
 			int commentCnt = 0;
-			while(vs.hasNext()) {
+			while (vs.hasNext()) {
 				String v = vs.next().toString();
-				if(v.startsWith("Comment")) {
+				if (v.startsWith("Comment")) {
 					commentCnt++;
 				}
 			}
 			out.collect(k, new IntWritable(commentCnt));
 		}
-		
+
 		@Override
 		public void configure(final JobConf arg0) { }
 
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts.
+	 */
 	public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-		
+
 		@Override
 		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
 				throws IOException {
 			int commentCnt = 0;
-			while(vs.hasNext()) {
+			while (vs.hasNext()) {
 				String v = vs.next().toString();
-				if(v.startsWith("Comment")) {
+				if (v.startsWith("Comment")) {
 					commentCnt++;
 				}
 			}
 			out.collect(new IntWritable(42), new IntWritable(commentCnt));
 		}
-		
+
 		@Override
 		public void configure(final JobConf arg0) { }
 
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts for a specific prefix.
+	 */
 	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
 		private String countPrefix;
-		
+
 		@Override
 		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
 				throws IOException {
 			int commentCnt = 0;
-			while(vs.hasNext()) {
+			while (vs.hasNext()) {
 				String v = vs.next().toString();
-				if(v.startsWith(this.countPrefix)) {
+				if (v.startsWith(this.countPrefix)) {
 					commentCnt++;
 				}
 			}
 			out.collect(k, new IntWritable(commentCnt));
 		}
-		
+
 		@Override
-		public void configure(final JobConf c) { 
+		public void configure(final JobConf c) {
 			this.countPrefix = c.get("my.cntPrefix");
 		}
 
@@ -191,6 +204,9 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 		public void close() throws IOException { }
 	}
 
+	/**
+	 * Test mapper.
+	 */
 	public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
 		private static final long serialVersionUID = 1L;
 		@Override
@@ -201,6 +217,9 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 		}
 	}
 
+	/**
+	 * Test mapper.
+	 */
 	public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
 		private static final long serialVersionUID = 1L;
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
index eed6f8f..b1992ff 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
@@ -18,45 +18,49 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test data.
+ */
 public class HadoopTestData {
 
 	public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>();
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine.")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15")));
-		
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(1), new Text("Hi")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(2), new Text("Hello")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(3), new Text("Hello world")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(4), new Text("Hello world, how are you?")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(5), new Text("I am fine.")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(6), new Text("Luke Skywalker")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(7), new Text("Comment#1")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(8), new Text("Comment#2")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(9), new Text("Comment#3")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(10), new Text("Comment#4")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(11), new Text("Comment#5")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(12), new Text("Comment#6")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(13), new Text("Comment#7")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(14), new Text("Comment#8")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(15), new Text("Comment#9")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(16), new Text("Comment#10")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(17), new Text("Comment#11")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(18), new Text("Comment#12")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(19), new Text("Comment#13")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(20), new Text("Comment#14")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(21), new Text("Comment#15")));
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index ce0143a..2bf69bd 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -18,16 +18,14 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred.example;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -39,95 +37,101 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 
-
+import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * Implements a word count which takes the input file and counts the number of
  * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
+ *
+ * <p>This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
  * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
  */
 public class HadoopMapredCompatWordCount {
-	
+
 	public static void main(String[] args) throws Exception {
 		if (args.length < 2) {
 			System.err.println("Usage: WordCount <input path> <result path>");
 			return;
 		}
-		
+
 		final String inputPath = args[0];
 		final String outputPath = args[1];
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// Set up the Hadoop Input Format
 		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
 		TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
-		
+
 		// Create a Flink job with it
 		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-		
-		DataSet<Tuple2<Text, LongWritable>> words = 
+
+		DataSet<Tuple2<Text, LongWritable>> words =
 				text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
 					.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
-		
+
 		// Set up Hadoop Output Format
-		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
 				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
 		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
 		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
-		
+
 		// Output & Execute
 		words.output(hadoopOutputFormat).setParallelism(1);
 		env.execute("Hadoop Compat WordCount");
 	}
-	
-	
+
+	/**
+	 * A {@link Mapper} that splits a line into words.
+	 */
 	public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> {
 
 		@Override
-		public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) 
+		public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
 				throws IOException {
 			// normalize and split the line
 			String line = v.toString();
 			String[] tokens = line.toLowerCase().split("\\W+");
-			
+
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Text(token), new LongWritable(1l));
+					out.collect(new Text(token), new LongWritable(1L));
 				}
 			}
 		}
-		
+
 		@Override
 		public void configure(JobConf arg0) { }
-		
+
 		@Override
 		public void close() throws IOException { }
-		
+
 	}
-	
+
+	/**
+	 * A {@link Reducer} to sum counts.
+	 */
 	public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> {
 
 		@Override
 		public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
 				throws IOException {
-			
+
 			long cnt = 0;
-			while(vs.hasNext()) {
+			while (vs.hasNext()) {
 				cnt += vs.next().get();
 			}
 			out.collect(k, new LongWritable(cnt));
-			
+
 		}
-		
+
 		@Override
 		public void configure(JobConf arg0) { }
-		
+
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
index 524318c..ff7c1b7 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -18,43 +18,47 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
 
-import java.util.ArrayList;
-import java.util.NoSuchElementException;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+/**
+ * Tests for the {@link HadoopTupleUnwrappingIterator}.
+ */
 public class HadoopTupleUnwrappingIteratorTest {
 
 	@Test
 	public void testValueIterator() {
-		
-		HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
+
+		HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt =
 				new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(new WritableSerializer
 						<IntWritable>(IntWritable.class));
-		
+
 		// many values
-		
+
 		ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8)));
-		
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(1)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(2)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(3)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(4)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(5)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(6)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(7)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(8)));
+
 		int expectedKey = 1;
-		int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
-		
+		int[] expectedValues = new int[] {1, 2, 3, 4, 5, 6, 7, 8};
+
 		valIt.set(tList.iterator());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
+		for (int expectedValue : expectedValues) {
 			Assert.assertTrue(valIt.hasNext());
 			Assert.assertTrue(valIt.hasNext());
 			Assert.assertTrue(valIt.next().get() == expectedValue);
@@ -63,18 +67,18 @@ public class HadoopTupleUnwrappingIteratorTest {
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
+
 		// one value
-		
+
 		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10)));
-		
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2), new IntWritable(10)));
+
 		expectedKey = 2;
 		expectedValues = new int[]{10};
-		
+
 		valIt.set(tList.iterator());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
+		for (int expectedValue : expectedValues) {
 			Assert.assertTrue(valIt.hasNext());
 			Assert.assertTrue(valIt.hasNext());
 			Assert.assertTrue(valIt.next().get() == expectedValue);
@@ -83,23 +87,23 @@ public class HadoopTupleUnwrappingIteratorTest {
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
+
 		// more values
-		
+
 		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21)));
-		
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(10)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(4)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(7)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(9)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(21)));
+
 		expectedKey = 3;
-		expectedValues = new int[]{10,4,7,9,21};
-		
+		expectedValues = new int[]{10, 4, 7, 9, 21};
+
 		valIt.set(tList.iterator());
 		Assert.assertTrue(valIt.hasNext());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
+		for (int expectedValue : expectedValues) {
 			Assert.assertTrue(valIt.hasNext());
 			Assert.assertTrue(valIt.hasNext());
 			Assert.assertTrue(valIt.next().get() == expectedValue);
@@ -108,22 +112,22 @@ public class HadoopTupleUnwrappingIteratorTest {
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
+
 		// no has next calls
-		
+
 		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0)));
-		
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(5)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(8)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(42)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(-1)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(0)));
+
 		expectedKey = 4;
-		expectedValues = new int[]{5,8,42,-1,0};
-		
+		expectedValues = new int[]{5, 8, 42, -1, 0};
+
 		valIt.set(tList.iterator());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
+		for (int expectedValue : expectedValues) {
 			Assert.assertTrue(valIt.next().get() == expectedValue);
 		}
 		try {
@@ -135,5 +139,5 @@ public class HadoopTupleUnwrappingIteratorTest {
 		Assert.assertFalse(valIt.hasNext());
 		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 48aa258..a23a50d 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -18,36 +18,42 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapreduce;
 
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.OperatingSystem;
+
 import org.junit.Assume;
 import org.junit.Before;
 
+/**
+ * IT cases for both the {@link HadoopInputFormat} and {@link HadoopOutputFormat}.
+ */
 public class HadoopInputOutputITCase extends JavaProgramTestBase {
-	
+
 	protected String textPath;
 	protected String resultPath;
-	
+
 	@Before
 	public void checkOperatingSystem() {
 		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
 		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
 	}
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
 		this.setParallelism(4);
 	}
-	
+
 	@Override
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
 	}
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		WordCount.main(new String[] { textPath, resultPath });

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
index ed83d78..09af3df 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -18,11 +18,16 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapreduce.example;
 
-import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -30,71 +35,67 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 
 /**
  * Implements a word count which takes the input file and counts the number of
  * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
+ *
+ * <p>This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
  * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
  */
 @SuppressWarnings("serial")
 public class WordCount {
-	
+
 	public static void main(String[] args) throws Exception {
 		if (args.length < 2) {
 			System.err.println("Usage: WordCount <input path> <result path>");
 			return;
 		}
-		
+
 		final String inputPath = args[0];
 		final String outputPath = args[1];
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// Set up the Hadoop Input Format
 		Job job = Job.getInstance();
 		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
 		TextInputFormat.addInputPath(job, new Path(inputPath));
-		
+
 		// Create a Flink job with it
 		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-		
+
 		// Tokenize the line and convert from Writable "Text" to String for better handling
 		DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
-		
+
 		// Sum up the words
 		DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
-		
+
 		// Convert String back to Writable "Text" for use with Hadoop Output Format
 		DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
-		
+
 		// Set up Hadoop Output Format
 		HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
 		hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
 		hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
 		TextOutputFormat.setOutputPath(job, new Path(outputPath));
-		
+
 		// Output & Execute
 		hadoopResult.output(hadoopOutputFormat);
 		env.execute("Word Count");
 	}
-	
+
 	/**
 	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
 	 */
 	public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-		
+
 		@Override
 		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
 			// normalize and split the line
 			String line = value.f1.toString();
 			String[] tokens = line.toLowerCase().split("\\W+");
-			
+
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
@@ -103,17 +104,17 @@ public class WordCount {
 			}
 		}
 	}
-	
+
 	/**
 	 * Converts Java data types to Hadoop Writables.
 	 */
 	public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-		
+
 		@Override
 		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
 			return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
 		}
-		
+
 	}
-	
+
 }