You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:46 UTC

[16/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-hadoop-compatibility

[FLINK-6711] Activate strict checkstyle for flink-hadoop-compatibility


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab8fe57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab8fe57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab8fe57

Branch: refs/heads/master
Commit: fab8fe57ca7808a8c7dfaee1834a0429217942f2
Parents: b12de1e
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:56:53 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:21 2017 +0200

----------------------------------------------------------------------
 .../flink-hadoop-compatibility/pom.xml          |   7 +-
 .../api/java/typeutils/WritableTypeInfo.java    |  23 +++--
 .../typeutils/runtime/WritableComparator.java   |  65 ++++++------
 .../typeutils/runtime/WritableSerializer.java   |  54 +++++-----
 .../flink/hadoopcompatibility/HadoopInputs.java |  28 +++---
 .../flink/hadoopcompatibility/HadoopUtils.java  |   6 +-
 .../mapred/HadoopMapFunction.java               |  59 +++++------
 .../mapred/HadoopReduceCombineFunction.java     |  75 +++++++-------
 .../mapred/HadoopReduceFunction.java            |  67 +++++++------
 .../mapred/wrapper/HadoopOutputCollector.java   |  15 +--
 .../wrapper/HadoopTupleUnwrappingIterator.java  |  32 +++---
 .../scala/HadoopInputs.scala                    |   6 +-
 .../java/typeutils/WritableExtractionTest.java  |  38 ++++---
 .../java/typeutils/WritableInfoParserTest.java  |  10 +-
 .../java/typeutils/WritableTypeInfoTest.java    |  10 +-
 .../typeutils/runtime/StringArrayWritable.java  |  36 ++++---
 .../runtime/WritableComparatorTest.java         |  25 +++--
 .../runtime/WritableComparatorUUIDTest.java     |   3 +
 .../api/java/typeutils/runtime/WritableID.java  |   4 +
 .../runtime/WritableSerializerTest.java         |  28 +++---
 .../runtime/WritableSerializerUUIDTest.java     |   3 +
 .../hadoopcompatibility/HadoopUtilsTest.java    |   4 +
 .../mapred/HadoopMapFunctionITCase.java         |  45 +++++----
 .../mapred/HadoopMapredITCase.java              |   8 +-
 .../HadoopReduceCombineFunctionITCase.java      |  69 +++++++++----
 .../mapred/HadoopReduceFunctionITCase.java      |  61 +++++++----
 .../mapred/HadoopTestData.java                  |  60 +++++------
 .../example/HadoopMapredCompatWordCount.java    |  70 +++++++------
 .../HadoopTupleUnwrappingIteratorTest.java      | 100 ++++++++++---------
 .../mapreduce/HadoopInputOutputITCase.java      |  16 ++-
 .../mapreduce/example/WordCount.java            |  49 ++++-----
 31 files changed, 604 insertions(+), 472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml
index 2dee17d..9427e43 100644
--- a/flink-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -19,9 +19,9 @@ under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
+
 	<modelVersion>4.0.0</modelVersion>
-	
+
 	<parent>
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-connectors</artifactId>
@@ -82,9 +82,8 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-		
-	</dependencies>
 
+	</dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 7bcb4bf..cde309b 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
 import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+
 import org.apache.hadoop.io.Writable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -41,9 +42,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Public
 public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private final Class<T> typeClass;
 
 	@PublicEvolving
@@ -59,11 +60,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 	@Override
 	@PublicEvolving
 	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
-		if(Comparable.class.isAssignableFrom(typeClass)) {
+		if (Comparable.class.isAssignableFrom(typeClass)) {
 			return new WritableComparator(sortOrderAscending, typeClass);
 		}
 		else {
-			throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
+			throw new UnsupportedOperationException("Cannot create Comparator for " + typeClass.getCanonicalName() + ". " +
 													"Class does not implement Comparable interface.");
 		}
 	}
@@ -85,7 +86,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 	public int getArity() {
 		return 1;
 	}
-	
+
 	@Override
 	@PublicEvolving
 	public int getTotalFields() {
@@ -109,17 +110,17 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
 		return new WritableSerializer<T>(typeClass);
 	}
-	
+
 	@Override
 	public String toString() {
 		return "WritableType<" + typeClass.getName() + ">";
-	}	
-	
+	}
+
 	@Override
 	public int hashCode() {
 		return typeClass.hashCode();
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof WritableTypeInfo) {
@@ -138,7 +139,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 	public boolean canEqual(Object obj) {
 		return obj instanceof WritableTypeInfo;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@PublicEvolving
@@ -150,5 +151,5 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 			throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 3a95d94..083a56f 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -18,30 +18,35 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.esotericsoftware.kryo.Kryo;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.types.NormalizableKey;
 import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
 import org.apache.hadoop.io.Writable;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.IOException;
 
+/**
+ * A {@link TypeComparator} for {@link Writable}.
+ * @param <T>
+ */
 public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private Class<T> type;
-	
+
 	private final boolean ascendingComparison;
-	
+
 	private transient T reference;
-	
+
 	private transient T tempReference;
-	
+
 	private transient Kryo kryo;
 
 	@SuppressWarnings("rawtypes")
@@ -51,78 +56,78 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 		this.type = type;
 		this.ascendingComparison = ascending;
 	}
-	
+
 	@Override
 	public int hash(T record) {
 		return record.hashCode();
 	}
-	
+
 	@Override
 	public void setReference(T toCompare) {
 		checkKryoInitialized();
 
 		reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
 	}
-	
+
 	@Override
 	public boolean equalToReference(T candidate) {
 		return candidate.equals(reference);
 	}
-	
+
 	@Override
 	public int compareToReference(TypeComparator<T> referencedComparator) {
 		T otherRef = ((WritableComparator<T>) referencedComparator).reference;
 		int comp = otherRef.compareTo(reference);
 		return ascendingComparison ? comp : -comp;
 	}
-	
+
 	@Override
 	public int compare(T first, T second) {
 		int comp = first.compareTo(second);
 		return ascendingComparison ? comp : -comp;
 	}
-	
+
 	@Override
 	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		ensureReferenceInstantiated();
 		ensureTempReferenceInstantiated();
-		
+
 		reference.readFields(firstSource);
 		tempReference.readFields(secondSource);
-		
+
 		int comp = reference.compareTo(tempReference);
 		return ascendingComparison ? comp : -comp;
 	}
-	
+
 	@Override
 	public boolean supportsNormalizedKey() {
 		return NormalizableKey.class.isAssignableFrom(type);
 	}
-	
+
 	@Override
 	public int getNormalizeKeyLen() {
 		ensureReferenceInstantiated();
-		
+
 		NormalizableKey<?> key = (NormalizableKey<?>) reference;
 		return key.getMaxNormalizedKeyLen();
 	}
-	
+
 	@Override
 	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
 		return keyBytes < getNormalizeKeyLen();
 	}
-	
+
 	@Override
 	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
 		NormalizableKey<?> key = (NormalizableKey<?>) record;
 		key.copyNormalizedKey(target, offset, numBytes);
 	}
-	
+
 	@Override
 	public boolean invertNormalizedKey() {
 		return !ascendingComparison;
 	}
-	
+
 	@Override
 	public TypeComparator<T> duplicate() {
 		return new WritableComparator<T>(ascendingComparison, type);
@@ -139,28 +144,28 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// unsupported normalization
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean supportsSerializationWithKeyNormalization() {
 		return false;
 	}
-	
+
 	@Override
 	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
 		throw new UnsupportedOperationException();
 	}
-	
+
 	@Override
 	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
 		throw new UnsupportedOperationException();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
@@ -173,13 +178,13 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 			this.kryo.register(type);
 		}
 	}
-	
+
 	private void ensureReferenceInstantiated() {
 		if (reference == null) {
 			reference = InstantiationUtil.instantiate(type, Writable.class);
 		}
 	}
-	
+
 	private void ensureTempReferenceInstantiated() {
 		if (tempReference == null) {
 			tempReference = InstantiationUtil.instantiate(type, Writable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 421d7a3..161e65b 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-
-import com.esotericsoftware.kryo.Kryo;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
@@ -28,98 +26,102 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.IOException;
 
+/**
+ * A {@link TypeSerializer} for {@link Writable}.
+ * @param <T>
+ */
 @Internal
 public final class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private final Class<T> typeClass;
-	
+
 	private transient Kryo kryo;
-	
+
 	private transient T copyInstance;
-	
+
 	public WritableSerializer(Class<T> typeClass) {
 		this.typeClass = typeClass;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public T createInstance() {
-		if(typeClass == NullWritable.class) {
+		if (typeClass == NullWritable.class) {
 			return (T) NullWritable.get();
 		}
 		return InstantiationUtil.instantiate(typeClass);
 	}
 
-
-	
 	@Override
 	public T copy(T from) {
 		checkKryoInitialized();
 
 		return KryoUtils.copy(from, kryo, this);
 	}
-	
+
 	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
 
 		return KryoUtils.copy(from, reuse, kryo, this);
 	}
-	
+
 	@Override
 	public int getLength() {
 		return -1;
 	}
-	
+
 	@Override
 	public void serialize(T record, DataOutputView target) throws IOException {
 		record.write(target);
 	}
-	
+
 	@Override
 	public T deserialize(DataInputView source) throws IOException {
 		return deserialize(createInstance(), source);
 	}
-	
+
 	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.readFields(source);
 		return reuse;
 	}
-	
+
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		ensureInstanceInstantiated();
 		copyInstance.readFields(source);
 		copyInstance.write(target);
 	}
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
 	}
-	
+
 	@Override
 	public WritableSerializer<T> duplicate() {
 		return new WritableSerializer<T>(typeClass);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private void ensureInstanceInstantiated() {
 		if (copyInstance == null) {
 			copyInstance = createInstance();
 		}
 	}
-	
+
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
@@ -133,12 +135,12 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
 		}
 	}
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public int hashCode() {
 		return this.typeClass.hashCode();
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof WritableSerializer) {
@@ -175,6 +177,10 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
 		}
 	}
 
+	/**
+	 * The config snapshot for this serializer.
+	 * @param <T>
+	 */
 	public static final class WritableSerializerConfigSnapshot<T extends Writable>
 			extends GenericTypeSerializerConfigSnapshot<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
index 9e8a3e4..dd5a74f 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -20,6 +20,7 @@ package org.apache.flink.hadoopcompatibility;
 
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -28,10 +29,10 @@ import java.io.IOException;
 /**
  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
  *
- * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * <p>It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
  * and {@link org.apache.hadoop.mapreduce.InputFormat}.
  *
- * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * <p>Key value pairs produced by the Hadoop InputFormats are converted into Flink
  * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
  * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
  * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
@@ -46,7 +47,7 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
 	 */
-	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+	public static <K, V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
 		// set input path in JobConf
 		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
 		// return wrapping InputFormat
@@ -58,7 +59,7 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
 	 */
-	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+	public static <K, V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
 		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
 	}
 
@@ -67,7 +68,7 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
 	 */
-	public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+	public static <K, V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
 	}
 
@@ -76,7 +77,7 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
 	 */
-	public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+	public static <K, V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
 		return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
 	}
 
@@ -85,9 +86,8 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
 	 */
-	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
-			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
-	{
+	public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
 		// set input path in Job
 		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
 		// return wrapping InputFormat
@@ -99,9 +99,8 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
 	 */
-	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
-			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
-	{
+	public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
 	}
 
@@ -110,9 +109,8 @@ public final class HadoopInputs {
 	 *
 	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
 	 */
-	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
-			org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
-	{
+	public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+			org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
 		return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
index 97ca329..738e2f8 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.hadoopcompatibility;
 
-import org.apache.commons.cli.Option;
 import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.apache.commons.cli.Option;
 import org.apache.hadoop.util.GenericOptionsParser;
 
 import java.io.IOException;
@@ -31,7 +32,7 @@ import java.util.Map;
  */
 public class HadoopUtils {
 	/**
-	 * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+	 * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}.
 	 *
 	 * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
 	 * @return A {@link ParameterTool}
@@ -49,4 +50,3 @@ public class HadoopUtils {
 	}
 }
 
-

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
index ba8aa90..5b679fe 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -18,68 +18,69 @@
 
 package org.apache.flink.hadoopcompatibility.mapred;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
 /**
- * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
  */
 @SuppressWarnings("rawtypes")
 @Public
-public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-					extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
-					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+					extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
+					implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+	private transient Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
 	private transient JobConf jobConf;
 
-	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
+	private transient HadoopOutputCollector<KEYOUT, VALUEOUT> outputCollector;
 	private transient Reporter reporter;
-	
+
 	/**
 	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-	 * 
+	 *
 	 * @param hadoopMapper The Hadoop Mapper to wrap.
 	 */
 	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
 		this(hadoopMapper, new JobConf());
 	}
-	
+
 	/**
 	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
 	 * The Hadoop Mapper is configured with the provided JobConf.
-	 * 
+	 *
 	 * @param hadoopMapper The Hadoop Mapper to wrap.
 	 * @param conf The JobConf that is used to configure the Hadoop Mapper.
 	 */
 	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
-		if(hadoopMapper == null) {
+		if (hadoopMapper == null) {
 			throw new NullPointerException("Mapper may not be null.");
 		}
-		if(conf == null) {
+		if (conf == null) {
 			throw new NullPointerException("JobConf may not be null.");
 		}
-		
+
 		this.mapper = hadoopMapper;
 		this.jobConf = conf;
 	}
@@ -88,13 +89,13 @@ public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		this.mapper.configure(jobConf);
-		
+
 		this.reporter = new HadoopDummyReporter();
 		this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
 	}
 
 	@Override
-	public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
+	public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
 			throws Exception {
 		outputCollector.setFlinkCollector(out);
 		mapper.map(value.f0, value.f1, outputCollector, reporter);
@@ -102,15 +103,15 @@ public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {	
+	public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
 		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
-		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
-		
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+
 		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
 		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+		return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
 	}
-	
+
 	/**
 	 * Custom serialization methods.
 	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
@@ -122,10 +123,10 @@ public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	@SuppressWarnings("unchecked")
 	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
-				(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
+				(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
 		mapper = InstantiationUtil.instantiate(mapperClass);
-		
+
 		jobConf = new JobConf();
 		jobConf.readFields(in);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index c1acc2b..fd0d37d 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -18,81 +18,82 @@
 
 package org.apache.flink.hadoopcompatibility.mapred;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
 /**
  * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
  */
 @SuppressWarnings("rawtypes")
 @Public
-public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-	extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
-	implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
-				ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+	extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
+	implements GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
+				ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
-	private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+	private transient Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
+	private transient Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> combiner;
 	private transient JobConf jobConf;
-	
+
 	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
-	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
-	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+	private transient HadoopOutputCollector<KEYOUT, VALUEOUT> reduceCollector;
+	private transient HadoopOutputCollector<KEYIN, VALUEIN> combineCollector;
 	private transient Reporter reporter;
 
 	/**
 	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
-	 * 
+	 *
 	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
 	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
 	 */
 	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-										Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+										Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner) {
 		this(hadoopReducer, hadoopCombiner, new JobConf());
 	}
-	
+
 	/**
 	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
-	 * 
+	 *
 	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
 	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
 	 * @param conf The JobConf that is used to configure both Hadoop Reducers.
 	 */
 	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
-		if(hadoopReducer == null) {
+								Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner, JobConf conf) {
+		if (hadoopReducer == null) {
 			throw new NullPointerException("Reducer may not be null.");
 		}
-		if(hadoopCombiner == null) {
+		if (hadoopCombiner == null) {
 			throw new NullPointerException("Combiner may not be null.");
 		}
-		if(conf == null) {
+		if (conf == null) {
 			throw new NullPointerException("JobConf may not be null.");
 		}
-		
+
 		this.reducer = hadoopReducer;
 		this.combiner = hadoopCombiner;
 		this.jobConf = conf;
@@ -104,7 +105,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 		super.open(parameters);
 		this.reducer.configure(jobConf);
 		this.combiner.configure(jobConf);
-		
+
 		this.reporter = new HadoopDummyReporter();
 		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
 		TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
@@ -114,7 +115,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	}
 
 	@Override
-	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+	public void reduce(final Iterable<Tuple2<KEYIN, VALUEIN>> values, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
 			throws Exception {
 		reduceCollector.setFlinkCollector(out);
 		valueIterator.set(values.iterator());
@@ -122,7 +123,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	}
 
 	@Override
-	public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+	public void combine(final Iterable<Tuple2<KEYIN, VALUEIN>> values, final Collector<Tuple2<KEYIN, VALUEIN>> out) throws Exception {
 		combineCollector.setFlinkCollector(out);
 		valueIterator.set(values.iterator());
 		combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
@@ -130,9 +131,9 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+	public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
 		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
 
 		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
 		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
@@ -144,7 +145,7 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
 	 */
 	private void writeObject(final ObjectOutputStream out) throws IOException {
-		
+
 		out.writeObject(reducer.getClass());
 		out.writeObject(combiner.getClass());
 		jobConf.write(out);
@@ -152,15 +153,15 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	@SuppressWarnings("unchecked")
 	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		
-		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+
+		Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
+				(Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
 		reducer = InstantiationUtil.instantiate(reducerClass);
-		
-		Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
-				(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+
+		Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>> combinerClass =
+				(Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>>) in.readObject();
 		combiner = InstantiationUtil.instantiate(combinerClass);
-		
+
 		jobConf = new JobConf();
 		jobConf.readFields(in);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index 55aea24..fadd0b2 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -18,70 +18,71 @@
 
 package org.apache.flink.hadoopcompatibility.mapred;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
 /**
- * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. 
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
  */
 @SuppressWarnings("rawtypes")
 @Public
-public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
-					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+					extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
+					implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
 	private transient JobConf jobConf;
-	
+
 	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
-	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient HadoopOutputCollector<KEYOUT, VALUEOUT> reduceCollector;
 	private transient Reporter reporter;
-	
+
 	/**
 	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- 	 * 
+ 	 *
 	 * @param hadoopReducer The Hadoop Reducer to wrap.
 	 */
 	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
 		this(hadoopReducer, new JobConf());
 	}
-	
+
 	/**
 	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- 	 * 
+ 	 *
 	 * @param hadoopReducer The Hadoop Reducer to wrap.
 	 * @param conf The JobConf that is used to configure the Hadoop Reducer.
 	 */
 	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
-		if(hadoopReducer == null) {
+		if (hadoopReducer == null) {
 			throw new NullPointerException("Reducer may not be null.");
 		}
-		if(conf == null) {
+		if (conf == null) {
 			throw new NullPointerException("JobConf may not be null.");
 		}
-		
+
 		this.reducer = hadoopReducer;
 		this.jobConf = conf;
 	}
@@ -91,7 +92,7 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		this.reducer.configure(jobConf);
-		
+
 		this.reporter = new HadoopDummyReporter();
 		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
 		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
@@ -100,9 +101,9 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	}
 
 	@Override
-	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+	public void reduce(final Iterable<Tuple2<KEYIN, VALUEIN>> values, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
 			throws Exception {
-		
+
 		reduceCollector.setFlinkCollector(out);
 		valueIterator.set(values.iterator());
 		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
@@ -110,32 +111,32 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+	public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
 		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
 
 		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
 		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+		return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
 	}
 
 	/**
-	 * Custom serialization methods
+	 * Custom serialization methods.
 	 * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
 	 */
 	private void writeObject(final ObjectOutputStream out) throws IOException {
-		
+
 		out.writeObject(reducer.getClass());
-		jobConf.write(out);		
+		jobConf.write(out);
 	}
 
 	@SuppressWarnings("unchecked")
 	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		
-		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+
+		Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass =
+				(Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
 		reducer = InstantiationUtil.instantiate(reducerClass);
-		
+
 		jobConf = new JobConf();
 		jobConf.readFields(in);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
index bfe03d3..ff9e686 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
+
 import org.apache.hadoop.mapred.OutputCollector;
 
 import java.io.IOException;
@@ -28,24 +29,24 @@ import java.io.IOException;
  * A Hadoop OutputCollector that wraps a Flink OutputCollector.
  * On each call of collect() the data is forwarded to the wrapped Flink collector.
  */
-public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
+public final class HadoopOutputCollector<KEY, VALUE> implements OutputCollector<KEY, VALUE> {
 
-	private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+	private Collector<Tuple2<KEY, VALUE>> flinkCollector;
 
-	private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+	private final Tuple2<KEY, VALUE> outTuple = new Tuple2<KEY, VALUE>();
 
 	/**
 	 * Set the wrapped Flink collector.
-	 * 
+	 *
 	 * @param flinkCollector The wrapped Flink OutputCollector.
 	 */
 	public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
 		this.flinkCollector = flinkCollector;
 	}
-	
+
 	/**
-	 * Use the wrapped Flink collector to collect a key-value pair for Flink. 
-	 * 
+	 * Use the wrapped Flink collector to collect a key-value pair for Flink.
+	 *
 	 * @param key the key to collect
 	 * @param val the value to collect
 	 * @throws IOException unexpected of key or value in key-value pair.

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
index 2d204b8..c58b5df 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -18,26 +18,26 @@
 
 package org.apache.flink.hadoopcompatibility.mapred.wrapper;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.util.Iterator;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
  */
-public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
+public class HadoopTupleUnwrappingIterator<KEY, VALUE>
 		extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
 
 	private final TypeSerializer<KEY> keySerializer;
 
-	private transient Iterator<Tuple2<KEY,VALUE>> iterator;
-	
+	private transient Iterator<Tuple2<KEY, VALUE>> iterator;
+
 	private transient KEY curKey;
 	private transient VALUE firstValue;
 	private transient boolean atFirst;
@@ -45,16 +45,16 @@ public class HadoopTupleUnwrappingIterator<KEY,VALUE>
 	public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) {
 		this.keySerializer = checkNotNull(keySerializer);
 	}
-	
+
 	/**
 	 * Set the Flink iterator to wrap.
-	 * 
+	 *
 	 * @param iterator The Flink iterator to wrap.
 	 */
 	@Override
-	public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+	public void set(final Iterator<Tuple2<KEY, VALUE>> iterator) {
 		this.iterator = iterator;
-		if(this.hasNext()) {
+		if (this.hasNext()) {
 			final Tuple2<KEY, VALUE> tuple = iterator.next();
 			this.curKey = keySerializer.copy(tuple.f0);
 			this.firstValue = tuple.f1;
@@ -63,30 +63,30 @@ public class HadoopTupleUnwrappingIterator<KEY,VALUE>
 			this.atFirst = false;
 		}
 	}
-	
+
 	@Override
 	public boolean hasNext() {
-		if(this.atFirst) {
+		if (this.atFirst) {
 			return true;
 		}
 		return iterator.hasNext();
 	}
-	
+
 	@Override
 	public VALUE next() {
-		if(this.atFirst) {
+		if (this.atFirst) {
 			this.atFirst = false;
 			return firstValue;
 		}
-		
+
 		final Tuple2<KEY, VALUE> tuple = iterator.next();
 		return tuple.f1;
 	}
-	
+
 	public KEY getCurrentKey() {
 		return this.curKey;
 	}
-	
+
 	@Override
 	public void remove() {
 		throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
index 133a5f4..a59af64 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.hadoopcompatibility.scala
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.hadoop.mapreduce
-import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
 import org.apache.hadoop.fs.{Path => HadoopPath}
 import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
@@ -81,7 +81,7 @@ object HadoopInputs {
       key,
       value,
       inputPath
-    )
+   )
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
index 2aefd9f..1fb3407 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
-
 import org.junit.Test;
 
 import java.io.DataInput;
@@ -33,8 +32,14 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for the type extraction of {@link Writable}.
+ */
 @SuppressWarnings("serial")
 public class WritableExtractionTest {
 
@@ -64,7 +69,7 @@ public class WritableExtractionTest {
 				TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
 		assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
 
-		TypeInformation<ViaAbstractClassExtension> info3 = 
+		TypeInformation<ViaAbstractClassExtension> info3 =
 				TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
 		assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
 	}
@@ -110,7 +115,7 @@ public class WritableExtractionTest {
 			}
 		};
 
-		TypeInformation<DirectWritable> outType = 
+		TypeInformation<DirectWritable> outType =
 				TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
 
 		assertTrue(outType instanceof WritableTypeInfo);
@@ -119,14 +124,14 @@ public class WritableExtractionTest {
 
 	@Test
 	public void testExtractAsPartOfPojo() {
-		PojoTypeInfo<PojoWithWritable> pojoInfo = 
+		PojoTypeInfo<PojoWithWritable> pojoInfo =
 				(PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
 
 		boolean foundWritable = false;
 		for (int i = 0; i < pojoInfo.getArity(); i++) {
 			PojoField field = pojoInfo.getPojoFieldAt(i);
 			String name = field.getField().getName();
-			
+
 			if (name.equals("hadoopCitizen")) {
 				if (foundWritable) {
 					fail("already seen");
@@ -134,10 +139,10 @@ public class WritableExtractionTest {
 				foundWritable = true;
 				assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
 				assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
-				
+
 			}
 		}
-		
+
 		assertTrue("missed the writable type", foundWritable);
 	}
 
@@ -152,9 +157,9 @@ public class WritableExtractionTest {
 		};
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<Writable> inType = 
+		TypeInformation<Writable> inType =
 				(TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
-		
+
 		try {
 			TypeExtractor.getMapReturnTypes(function, inType);
 			fail("exception expected");
@@ -168,11 +173,11 @@ public class WritableExtractionTest {
 	//  test type classes
 	// ------------------------------------------------------------------------
 
-	public interface ExtendedWritable extends Writable {}
+	private interface ExtendedWritable extends Writable {}
 
-	public static abstract class AbstractWritable implements Writable {}
+	private abstract static class AbstractWritable implements Writable {}
 
-	public static class DirectWritable implements Writable {
+	private static class DirectWritable implements Writable {
 
 		@Override
 		public void write(DataOutput dataOutput) throws IOException {}
@@ -181,7 +186,7 @@ public class WritableExtractionTest {
 		public void readFields(DataInput dataInput) throws IOException {}
 	}
 
-	public static class ViaInterfaceExtension implements ExtendedWritable {
+	private static class ViaInterfaceExtension implements ExtendedWritable {
 
 		@Override
 		public void write(DataOutput dataOutput) throws IOException {}
@@ -190,7 +195,7 @@ public class WritableExtractionTest {
 		public void readFields(DataInput dataInput) throws IOException {}
 	}
 
-	public static class ViaAbstractClassExtension extends AbstractWritable {
+	private static class ViaAbstractClassExtension extends AbstractWritable {
 
 		@Override
 		public void write(DataOutput dataOutput) throws IOException {}
@@ -199,6 +204,9 @@ public class WritableExtractionTest {
 		public void readFields(DataInput dataInput) throws IOException {}
 	}
 
+	/**
+	 * Test Pojo containing a {@link DirectWritable}.
+	 */
 	public static class PojoWithWritable {
 		public String str;
 		public DirectWritable hadoopCitizen;

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
index 3d2b652..7262bb7 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,6 +31,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+/**
+ * Tests for the type information parsing of {@link Writable}.
+ */
 public class WritableInfoParserTest {
 
 	@Test
@@ -66,7 +69,7 @@ public class WritableInfoParserTest {
 	//  Test types
 	// ------------------------------------------------------------------------
 
-	public static class MyWritable implements Writable {
+	private static class MyWritable implements Writable {
 
 		@Override
 		public void write(DataOutput out) throws IOException {}
@@ -75,6 +78,9 @@ public class WritableInfoParserTest {
 		public void readFields(DataInput in) throws IOException {}
 	}
 
+	/**
+	 * Test Pojo containing a {@link Writable}.
+	 */
 	public static class MyPojo {
 		public Integer basic;
 		public Tuple2<String, Integer> tuple;

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
index 666ab84..903c856 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+import org.apache.hadoop.io.Writable;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
-import org.apache.hadoop.io.Writable;
 
 /**
  * Test for {@link WritableTypeInfo}.
@@ -41,7 +43,7 @@ public class WritableTypeInfoTest extends TypeInformationTestBase<WritableTypeIn
 	//  test types
 	// ------------------------------------------------------------------------
 
-	public static class TestClass implements Writable {
+	private static class TestClass implements Writable {
 
 		@Override
 		public void write(DataOutput dataOutput) throws IOException {}
@@ -50,7 +52,7 @@ public class WritableTypeInfoTest extends TypeInformationTestBase<WritableTypeIn
 		public void readFields(DataInput dataInput) throws IOException {}
 	}
 
-	public static class AlternateClass implements Writable {
+	private static class AlternateClass implements Writable {
 
 		@Override
 		public void write(DataOutput dataOutput) throws IOException {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
index 8c3a8cd..6101c0a 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -19,64 +19,68 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.hadoop.io.Writable;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+/**
+ * A {@link Writable} and {@link Comparable} wrapper for a string array.
+ */
 public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> {
-	
+
 	private String[] array = new String[0];
-	
+
 	public StringArrayWritable() {
 		super();
 	}
-	
+
 	public StringArrayWritable(String[] array) {
 		this.array = array;
 	}
-	
+
 	@Override
 	public void write(DataOutput out) throws IOException {
 		out.writeInt(this.array.length);
-		
-		for(String str : this.array) {
+
+		for (String str : this.array) {
 			byte[] b = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			out.writeInt(b.length);
 			out.write(b);
 		}
 	}
-	
+
 	@Override
 	public void readFields(DataInput in) throws IOException {
 		this.array = new String[in.readInt()];
-		
-		for(int i = 0; i < this.array.length; i++) {
+
+		for (int i = 0; i < this.array.length; i++) {
 			byte[] b = new byte[in.readInt()];
 			in.readFully(b);
 			this.array[i] = new String(b, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
-	
+
 	@Override
 	public int compareTo(StringArrayWritable o) {
-		if(this.array.length != o.array.length) {
+		if (this.array.length != o.array.length) {
 			return this.array.length - o.array.length;
 		}
-		
-		for(int i = 0; i < this.array.length; i++) {
+
+		for (int i = 0; i < this.array.length; i++) {
 			int comp = this.array[i].compareTo(o.array[i]);
-			if(comp != 0) {
+			if (comp != 0) {
 				return comp;
 			}
 		}
 		return 0;
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
-		if(!(obj instanceof StringArrayWritable)) {
+		if (!(obj instanceof StringArrayWritable)) {
 			return false;
 		}
 		return this.compareTo((StringArrayWritable) obj) == 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
index 96f844c..104f754 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -22,30 +22,33 @@ import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+/**
+ * Tests for the {@link WritableComparator}.
+ */
 public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {
-	
+
 	StringArrayWritable[] data = new StringArrayWritable[]{
 			new StringArrayWritable(new String[]{}),
 			new StringArrayWritable(new String[]{""}),
-			new StringArrayWritable(new String[]{"a","a"}),
-			new StringArrayWritable(new String[]{"a","b"}),
-			new StringArrayWritable(new String[]{"c","c"}),
-			new StringArrayWritable(new String[]{"d","f"}),
-			new StringArrayWritable(new String[]{"d","m"}),
-			new StringArrayWritable(new String[]{"z","x"}),
-			new StringArrayWritable(new String[]{"a","a", "a"})
+			new StringArrayWritable(new String[]{"a", "a"}),
+			new StringArrayWritable(new String[]{"a", "b"}),
+			new StringArrayWritable(new String[]{"c", "c"}),
+			new StringArrayWritable(new String[]{"d", "f"}),
+			new StringArrayWritable(new String[]{"d", "m"}),
+			new StringArrayWritable(new String[]{"z", "x"}),
+			new StringArrayWritable(new String[]{"a", "a", "a"})
 	};
-	
+
 	@Override
 	protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) {
 		return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class);
 	}
-	
+
 	@Override
 	protected TypeSerializer<StringArrayWritable> createSerializer() {
 		return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
 	}
-	
+
 	@Override
 	protected StringArrayWritable[] getSortedTestData() {
 		return data;

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
index 94e759d..f8d86de 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.UUID;
 
+/**
+ * Tests for the {@link WritableComparator} with {@link WritableID}.
+ */
 public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
 	@Override
 	protected TypeComparator<WritableID> createComparator(boolean ascending) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
index 4274cf6..47ddf42 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.DataInput;
@@ -25,6 +26,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.UUID;
 
+/**
+ * Test object that is both {@link Comparable} and {@link Writable}.
+ */
 public class WritableID implements WritableComparable<WritableID> {
 	private UUID uuid;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
index bb5f4d4..9779c17 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -22,29 +22,33 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+
 import org.junit.Test;
 
+/**
+ * Tests for the {@link WritableSerializer}.
+ */
 public class WritableSerializerTest {
-	
+
 	@Test
 	public void testStringArrayWritable() {
 		StringArrayWritable[] data = new StringArrayWritable[]{
 				new StringArrayWritable(new String[]{}),
 				new StringArrayWritable(new String[]{""}),
-				new StringArrayWritable(new String[]{"a","a"}),
-				new StringArrayWritable(new String[]{"a","b"}),
-				new StringArrayWritable(new String[]{"c","c"}),
-				new StringArrayWritable(new String[]{"d","f"}),
-				new StringArrayWritable(new String[]{"d","m"}),
-				new StringArrayWritable(new String[]{"z","x"}),
-				new StringArrayWritable(new String[]{"a","a", "a"})
+				new StringArrayWritable(new String[]{"a", "a"}),
+				new StringArrayWritable(new String[]{"a", "b"}),
+				new StringArrayWritable(new String[]{"c", "c"}),
+				new StringArrayWritable(new String[]{"d", "f"}),
+				new StringArrayWritable(new String[]{"d", "m"}),
+				new StringArrayWritable(new String[]{"z", "x"}),
+				new StringArrayWritable(new String[]{"a", "a", "a"})
 		};
-		
+
 		WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
 		WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig());
-		
-		SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data);
-		
+
+		SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer, writableTypeInfo.getTypeClass(), -1, data);
+
 		testInstance.testAll();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
index 2af7730..dca043d 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.UUID;
 
+/**
+ * Tests for the {@link WritableSerializer} with {@link WritableID}.
+ */
 public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
 	@Override
 	protected TypeSerializer<WritableID> createSerializer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
index 6f7673b..3bda1e6 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -20,10 +20,14 @@ package org.apache.flink.hadoopcompatibility;
 
 import org.apache.flink.api.java.utils.AbstractParameterToolTest;
 import org.apache.flink.api.java.utils.ParameterTool;
+
 import org.junit.Test;
 
 import java.io.IOException;
 
+/**
+ * Tests for the {@link HadoopUtils}.
+ */
 public class HadoopUtilsTest extends AbstractParameterToolTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
index 4d1acb4..2fb2f88 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred;
 
-import java.io.IOException;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -38,6 +37,11 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+
+/**
+ * IT cases for the {@link HadoopMapFunction}.
+ */
 @RunWith(Parameterized.class)
 public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
 
@@ -124,53 +128,60 @@ public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
 
 		compareResultsByLinesInMemory(expected, resultPath);
 	}
-	
 
-	
+	/**
+	 * {@link Mapper} that only forwards records containing "bananas".
+	 */
 	public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-		
+
 		@Override
-		public void map(final IntWritable k, final Text v, 
+		public void map(final IntWritable k, final Text v,
 				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
-			if ( v.toString().contains("bananas") ) {
-				out.collect(k,v);
+			if (v.toString().contains("bananas")) {
+				out.collect(k, v);
 			}
 		}
-		
+
 		@Override
 		public void configure(final JobConf arg0) { }
 
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
+	/**
+	 * {@link Mapper} that duplicates records.
+	 */
 	public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-		
+
 		@Override
-		public void map(final IntWritable k, final Text v, 
+		public void map(final IntWritable k, final Text v,
 				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
 			out.collect(k, v);
 			out.collect(k, new Text(v.toString().toUpperCase()));
 		}
-		
+
 		@Override
 		public void configure(final JobConf arg0) { }
 
 		@Override
 		public void close() throws IOException { }
 	}
-	
+
+	/**
+	 * {@link Mapper} that filters records based on a prefix.
+	 */
 	public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
 		private String filterPrefix;
-		
+
 		@Override
 		public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
 				throws IOException {
-			if(v.toString().startsWith(filterPrefix)) {
+			if (v.toString().startsWith(filterPrefix)) {
 				out.collect(k, v);
 			}
 		}
-		
+
 		@Override
 		public void configure(JobConf c) {
 			filterPrefix = c.get("my.filterPrefix");

http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 0b5a366..145eaaa 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -22,11 +22,15 @@ import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredComp
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.OperatingSystem;
+
 import org.junit.Assume;
 import org.junit.Before;
 
+/**
+ * IT cases for mapred.
+ */
 public class HadoopMapredITCase extends JavaProgramTestBase {
-	
+
 	protected String textPath;
 	protected String resultPath;
 
@@ -47,7 +51,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
 	}
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });