You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 16:23:02 UTC

[2/4] flink git commit: [FLINK-7185] Activate checkstyle flink-java/io

[FLINK-7185] Activate checkstyle flink-java/io

This closes #4340.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9c9fb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9c9fb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9c9fb5

Branch: refs/heads/master
Commit: 0c9c9fb5cb7a8a27d444db5c725c8abd792ca761
Parents: d578810
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:31:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 16:37:52 2017 +0200

----------------------------------------------------------------------
 .../api/java/io/CollectionInputFormat.java      |  40 ++--
 .../flink/api/java/io/CsvInputFormat.java       |  17 +-
 .../flink/api/java/io/CsvOutputFormat.java      |  26 +--
 .../org/apache/flink/api/java/io/CsvReader.java |  97 ++++----
 .../api/java/io/DiscardingOutputFormat.java     |   1 -
 .../flink/api/java/io/IteratorInputFormat.java  |  12 +-
 .../java/io/LocalCollectionOutputFormat.java    |  21 +-
 .../java/io/ParallelIteratorInputFormat.java    |  20 +-
 .../flink/api/java/io/PojoCsvInputFormat.java   |   5 +
 .../flink/api/java/io/PrimitiveInputFormat.java |   5 +-
 .../flink/api/java/io/PrintingOutputFormat.java |  30 +--
 .../flink/api/java/io/RowCsvInputFormat.java    |   3 +
 .../flink/api/java/io/SplitDataProperties.java  |  85 ++++---
 .../flink/api/java/io/TextInputFormat.java      |  50 ++--
 .../flink/api/java/io/TextOutputFormat.java     |  50 ++--
 .../flink/api/java/io/TextValueInputFormat.java |  57 ++---
 .../flink/api/java/io/TupleCsvInputFormat.java  |   9 +-
 .../api/java/io/TypeSerializerOutputFormat.java |   6 +-
 .../apache/flink/api/java/io/CSVReaderTest.java |  90 ++++----
 .../api/java/io/CollectionInputFormatTest.java  |  67 +++---
 .../flink/api/java/io/CsvInputFormatTest.java   | 230 ++++++++++---------
 .../flink/api/java/io/CsvOutputFormatTest.java  |   7 +-
 .../flink/api/java/io/FromElementsTest.java     |   8 +-
 .../api/java/io/PrimitiveInputFormatTest.java   |  35 ++-
 .../api/java/io/RowCsvInputFormatTest.java      |  15 +-
 .../flink/api/java/io/TextInputFormatTest.java  | 129 +++++------
 .../api/java/io/TypeSerializerFormatTest.java   |   6 +-
 tools/maven/suppressions-java.xml               |   7 -
 28 files changed, 587 insertions(+), 541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 90e6712..eebe56f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
@@ -34,6 +26,14 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * An input format that returns objects from a collection.
  */
@@ -55,7 +55,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		}
 
 		this.serializer = serializer;
-		
+
 		this.dataSet = dataSet;
 	}
 
@@ -67,10 +67,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 	@Override
 	public void open(GenericInputSplit split) throws IOException {
 		super.open(split);
-		
+
 		this.iterator = this.dataSet.iterator();
 	}
-	
+
 	@Override
 	public T nextRecord(T record) throws IOException {
 		return this.iterator.next();
@@ -80,10 +80,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		out.defaultWriteObject();
-		
+
 		final int size = dataSet.size();
 		out.writeInt(size);
-		
+
 		if (size > 0) {
 			DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
 			for (T element : dataSet){
@@ -97,7 +97,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
-		
+
 		if (collectionLength > 0) {
 			try {
 				DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
@@ -113,9 +113,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 		dataSet = list;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
@@ -136,14 +136,14 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		sb.append(']');
 		return sb.toString();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static <X> void checkCollection(Collection<X> elements, Class<X> viewedAs) {
 		if (elements == null || viewedAs == null) {
 			throw new NullPointerException();
 		}
-		
+
 		for (X elem : elements) {
 			if (elem == null) {
 				throw new IllegalArgumentException("The collection must not contain null elements.");
@@ -157,7 +157,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 			if (!viewedAs.isAssignableFrom(elem.getClass()) &&
 					!(elem.getClass().toString().equals("class scala.runtime.BoxedUnit") && viewedAs.equals(void.class))) {
 
-				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + 
+				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
 							viewedAs.getCanonicalName());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index f1a16ea..0bd4e69 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -21,13 +21,18 @@ package org.apache.flink.api.java.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import java.io.IOException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
 
+/**
+ * InputFormat that reads csv files.
+ *
+ * @param <OUT>
+ */
 @Internal
 public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
@@ -38,7 +43,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 	public static final String DEFAULT_FIELD_DELIMITER = ",";
 
 	protected transient Object[] parsedValues;
-	
+
 	protected CsvInputFormat(Path filePath) {
 		super(filePath);
 	}
@@ -63,7 +68,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
 		// left to right evaluation makes access [0] okay
 		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
-		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n') {
 			this.lineDelimiterIsLinebreak = true;
 		}
 
@@ -123,7 +128,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
 	protected static boolean[] createDefaultMask(int size) {
 		boolean[] includedMask = new boolean[size];
-		for (int x=0; x<includedMask.length; x++) {
+		for (int x = 0; x < includedMask.length; x++) {
 			includedMask[x] = true;
 		}
 		return includedMask;
@@ -154,5 +159,5 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 	public String toString() {
 		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index c2fe13c..44fd679 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -16,18 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +27,15 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
 /**
  * This is an OutputFormat to serialize {@link org.apache.flink.api.java.tuple.Tuple}s to text. The output is
  * structured by record delimiters and field delimiters as common in CSV files.
@@ -121,8 +122,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	/**
 	 * Configures the format to either allow null values (writing an empty field),
 	 * or to throw an exception when encountering a null field.
-	 * <p>
-	 * by default, null values are disallowed.
+	 *
+	 * <p>by default, null values are disallowed.
 	 *
 	 * @param allowNulls Flag to indicate whether the output format should accept null values.
 	 */
@@ -144,8 +145,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	 * Configures whether the output format should quote string values. String values are fields
 	 * of type {@link java.lang.String} and {@link org.apache.flink.types.StringValue}, as well as
 	 * all subclasses of the latter.
-	 * <p>
-	 * By default, strings are not quoted.
+	 *
+	 * <p>By default, strings are not quoted.
 	 *
 	 * @param quoteStrings Flag indicating whether string fields should be quoted.
 	 */
@@ -215,7 +216,6 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	}
 
     /**
-	 *
 	 * The purpose of this method is solely to check whether the data type to be processed
 	 * is in fact a tuple type.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ce2f4fa..684911a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -23,9 +23,9 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
 import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -44,14 +44,13 @@ import java.util.Arrays;
 public class CsvReader {
 
 	private final Path path;
-	
+
 	private final ExecutionEnvironment executionContext;
-	
-	
+
 	protected boolean[] includedMask;
-	
+
 	protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
-	
+
 	protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
 
 	protected String commentPrefix = null; //default: no comments
@@ -61,35 +60,35 @@ public class CsvReader {
 	protected char quoteCharacter = '"';
 
 	protected boolean skipFirstLineAsHeader = false;
-	
+
 	protected boolean ignoreInvalidLines = false;
 
 	private String charset = "UTF-8";
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 		Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
-		
+
 		this.path = filePath;
 		this.executionContext = executionContext;
 	}
-	
+
 	public CsvReader(String filePath, ExecutionEnvironment executionContext) {
 		this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
 	}
-	
+
 	public Path getFilePath() {
 		return this.path;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Configures the delimiter that separates the lines/rows. The linebreak character
 	 * ({@code '\n'}) is used by default.
-	 * 
+	 *
 	 * @param delimiter The delimiter that separates the rows.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -97,15 +96,15 @@ public class CsvReader {
 		if (delimiter == null || delimiter.length() == 0) {
 			throw new IllegalArgumentException("The delimiter must not be null or an empty string");
 		}
-		
+
 		this.lineDelimiter = delimiter;
 		return this;
 	}
-	
+
 	/**
 	 * Configures the delimiter that separates the fields within a row. The comma character
 	 * ({@code ','}) is used by default.
-	 * 
+	 *
 	 * @param delimiter The delimiter that separates the fields in one row.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 *
@@ -148,7 +147,7 @@ public class CsvReader {
 	 * Configures the string that starts comments.
 	 * By default comments will be treated as invalid lines.
 	 * This function only recognizes comments which start at the beginning of the line!
-	 * 
+	 *
 	 * @param commentPrefix The string that starts the comments.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -156,7 +155,7 @@ public class CsvReader {
 		if (commentPrefix == null || commentPrefix.length() == 0) {
 			throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
 		}
-		
+
 		this.commentPrefix = commentPrefix;
 		return this;
 	}
@@ -172,7 +171,7 @@ public class CsvReader {
 	}
 
 	/**
-	 * Sets the charset of the reader
+	 * Sets the charset of the reader.
 	 *
 	 * @param charset The character set to set.
 	 */
@@ -189,7 +188,7 @@ public class CsvReader {
 	 * the boolean array is {@code true}.
 	 * The number of fields in the result is consequently equal to the number of times that {@code true}
 	 * occurs in the fields array.
-	 * 
+	 *
 	 * @param fields The array of flags that describes which fields are to be included and which not.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -197,14 +196,14 @@ public class CsvReader {
 		if (fields == null || fields.length == 0) {
 			throw new IllegalArgumentException("The set of included fields must not be null or empty.");
 		}
-		
+
 		int lastTruePos = -1;
 		for (int i = 0; i < fields.length; i++) {
 			if (fields[i]) {
 				lastTruePos = i;
 			}
 		}
-		
+
 		if (lastTruePos == -1) {
 			throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
 		}
@@ -225,13 +224,13 @@ public class CsvReader {
 	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
 	 * {@code false}). The result contains the fields where the corresponding position in
 	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
-	 * 
+	 *
 	 * @param mask The string mask defining which fields to include and which to skip.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
 	public CsvReader includeFields(String mask) {
 		boolean[] includedMask = new boolean[mask.length()];
-		
+
 		for (int i = 0; i < mask.length(); i++) {
 			char c = mask.charAt(i);
 			if (c == '1' || c == 'T' || c == 't') {
@@ -240,10 +239,10 @@ public class CsvReader {
 				throw new IllegalArgumentException("Mask string may contain only '0' and '1'.");
 			}
 		}
-		
+
 		return includeFields(includedMask);
 	}
-	
+
 	/**
 	 * Configures which fields of the CSV file should be included and which should be skipped. The
 	 * bits in the value (read from least significant to most significant) define whether the field at
@@ -252,14 +251,14 @@ public class CsvReader {
 	 * non-zero bit.
 	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
 	 * include the fields where the corresponding bit is one.
-	 * <p>
-	 * Examples:
+	 *
+	 * <p>Examples:
 	 * <ul>
 	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
 	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
 	 *       two and three, skip fields four and five, and include field six.</li>
 	 * </ul>
-	 * 
+	 *
 	 * @param mask The bit mask defining which fields to include and which to skip.
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
@@ -267,36 +266,36 @@ public class CsvReader {
 		if (mask == 0) {
 			throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
 		}
-		
+
 		ArrayList<Boolean> fields = new ArrayList<Boolean>();
 
 		while (mask != 0) {
 			fields.add((mask & 0x1L) != 0);
 			mask >>>= 1;
 		}
-		
+
 		boolean[] fieldsArray = new boolean[fields.size()];
 		for (int i = 0; i < fieldsArray.length; i++) {
 			fieldsArray[i] = fields.get(i);
 		}
-		
+
 		return includeFields(fieldsArray);
 	}
 
 	/**
 	 * Sets the CSV reader to ignore the first line. This is useful for files that contain a header line.
-	 * 
+	 *
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
 	public CsvReader ignoreFirstLine() {
 		skipFirstLineAsHeader = true;
 		return this;
 	}
-	
+
 	/**
-	 * Sets the CSV reader to ignore any invalid lines. 
+	 * Sets the CSV reader to ignore any invalid lines.
 	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
-	 * 
+	 *
 	 * @return The CSV reader instance itself, to allow for fluent function chaining.
 	 */
 	public CsvReader ignoreInvalidLines(){
@@ -325,12 +324,12 @@ public class CsvReader {
 
 		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
 	 * {@link Tuple}. The type information for the fields is obtained from the type class. The type
 	 * consequently needs to specify all generic field types of the tuple.
-	 * 
+	 *
 	 * @param targetType The class of the target type, needs to be a subclass of Tuple.
 	 * @return The DataSet representing the parsed CSV data.
 	 */
@@ -339,24 +338,24 @@ public class CsvReader {
 		if (!Tuple.class.isAssignableFrom(targetType)) {
 			throw new IllegalArgumentException("The target type must be a subclass of " + Tuple.class.getName());
 		}
-		
+
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
 		CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask);
-		
+
 		Class<?>[] classes = new Class<?>[typeInfo.getArity()];
 		for (int i = 0; i < typeInfo.getArity(); i++) {
 			classes[i] = typeInfo.getTypeAt(i).getTypeClass();
 		}
-		
+
 		configureInputFormat(inputFormat);
 		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Miscellaneous
 	// --------------------------------------------------------------------------------------------
-	
+
 	private void configureInputFormat(CsvInputFormat<?> format) {
 		format.setCharset(this.charset);
 		format.setDelimiter(this.lineDelimiter);
@@ -368,10 +367,10 @@ public class CsvReader {
 			format.enableQuotedStringParsing(this.quoteCharacter);
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------	
+
+	// --------------------------------------------------------------------------------------------
 	// The following lines are generated.
-	// --------------------------------------------------------------------------------------------	
+	// --------------------------------------------------------------------------------------------
 	// BEGIN_OF_TUPLE_DEPENDENT_CODE
 	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
index f01d864..7358b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Public;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
index cb8bd6a..05f3ccd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
-import java.io.Serializable;
-import java.util.Iterator;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
 
+import java.io.Serializable;
+import java.util.Iterator;
+
 /**
  * An input format that returns objects from an iterator.
  */
@@ -35,13 +34,12 @@ public class IteratorInputFormat<T> extends GenericInputFormat<T> implements Non
 	private static final long serialVersionUID = 1L;
 
 	private Iterator<T> iterator; // input data as serializable iterator
-	
-	
+
 	public IteratorInputFormat(Iterator<T> iterator) {
 		if (!(iterator instanceof Serializable)) {
 			throw new IllegalArgumentException("The data source iterator must be serializable.");
 		}
-		
+
 		this.iterator = iterator;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
index 65ed6c3..bcb1cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.api.java.io;
 
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.RichOutputFormat;
@@ -33,15 +26,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
- *  An output format that writes record into collection
+ *  An output format that adds records to a collection.
  */
 @PublicEvolving
 public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implements InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
 
-	private static Map<Integer,Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
+	private static final Map<Integer, Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
 
 	private transient ArrayList<T> taskResult;
 
@@ -67,7 +66,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
 	@Override
 	public void configure(Configuration parameters) {}
 
-
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
 		this.taskResult = new ArrayList<T>();
@@ -80,7 +78,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
 		this.taskResult.add(recordCopy);
 	}
 
-
 	@Override
 	public void close() throws IOException {
 		synchronized (RESULT_HOLDER) {
@@ -93,6 +90,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
 	@Override
 	@SuppressWarnings("unchecked")
 	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		this.typeSerializer = (TypeSerializer<T>)type.createSerializer(executionConfig);
+		this.typeSerializer = (TypeSerializer<T>) type.createSerializer(executionConfig);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
index a6ac853..25e25b4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.util.SplittableIterator;
 
+import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * An input format that generates data in parallel through a {@link SplittableIterator}.
@@ -34,25 +33,22 @@ import org.apache.flink.util.SplittableIterator;
 public class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> {
 
 	private static final long serialVersionUID = 1L;
-	
-	
+
 	private final SplittableIterator<T> source;
-	
+
 	private transient Iterator<T> splitIterator;
-	
-	
-	
+
 	public ParallelIteratorInputFormat(SplittableIterator<T> iterator) {
 		this.source = iterator;
 	}
-	
+
 	@Override
 	public void open(GenericInputSplit split) throws IOException {
 		super.open(split);
-		
+
 		this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits());
 	}
-	
+
 	@Override
 	public boolean reachedEnd() {
 		return !this.splitIterator.hasNext();

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
index 990e9e6..804d02b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Internal;
@@ -29,6 +30,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Input format that reads csv into POJOs.
+ * @param <OUT> resulting POJO type
+ */
 @Internal
 public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index d454765..794703b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -44,7 +44,6 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 
 	private transient FieldParser<OT> parser;
 
-
 	public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
 		super(filePath, null);
 		this.primitiveClass = primitiveClass;
@@ -70,7 +69,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 	public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
 		// Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
 		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
-			&& offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+			&& offset + numBytes >= 1 && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
 			numBytes -= 1;
 		}
 
@@ -79,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 			return parser.getLastResult();
 		} else {
 			String s = new String(bytes, offset, numBytes, getCharset());
-			throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
+			throw new IOException("Could not parse value: \"" + s + "\" as type " + primitiveClass.getSimpleName());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index a010fd8..0ab1abb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.PrintStream;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.PrintStream;
+
+/**
+ * Output format that prints results into either stdout or stderr.
+ * @param <T>
+ */
 @PublicEvolving
 public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 
@@ -37,19 +41,19 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 	private boolean target;
 
 	private transient PrintStream stream;
-	
+
 	private transient String prefix;
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Instantiates a printing output format that prints to standard out.
 	 */
 	public PrintingOutputFormat() {}
-	
+
 	/**
 	 * Instantiates a printing output format that prints to standard out.
-	 * 
+	 *
 	 * @param stdErr True, if the format should print to standard error instead of standard out.
 	 */
 	public PrintingOutputFormat(boolean stdErr) {
@@ -65,20 +69,18 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 		this(stdErr);
 		this.sinkIdentifier = sinkIdentifier;
 	}
-	
+
 	public void setTargetToStandardOut() {
 		this.target = STD_OUT;
 	}
-	
+
 	public void setTargetToStandardErr() {
 		this.target = STD_ERR;
 	}
-	
-	
+
 	@Override
 	public void configure(Configuration parameters) {}
 
-
 	@Override
 	public void open(int taskNumber, int numTasks) {
 		// get the target stream
@@ -116,9 +118,9 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 		this.prefix = null;
 		this.sinkIdentifier = null;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index b752966..15ef90e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -29,6 +29,9 @@ import org.apache.flink.types.parser.FieldParser;
 
 import java.util.Arrays;
 
+/**
+ * Input format that reads csv into {@link Row}.
+ */
 @PublicEvolving
 public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultTypeQueryable<Row> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index db09380..c1487f6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.common.operators.Keys;
 
 import java.util.Arrays;
 
@@ -34,7 +34,7 @@ import java.util.Arrays;
  * SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit}
  * generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}.
  *
- * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
+ * <p>InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
  * SplitDataProperties can define that the elements which are generated by the associated InputFormat
  * are
  * <ul>
@@ -46,7 +46,7 @@ import java.util.Arrays;
  *    are in the defined order.</li>
  * </ul>
  *
- * <b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
+ * <p><b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
  * data reorganization steps such as shuffling or sorting can be avoided.
  * HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong!</b>
  *
@@ -90,8 +90,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	/**
 	 * Defines that data is partitioned across input splits on the fields defined by field positions.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -106,8 +106,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that data is partitioned using a specific partitioning method
 	 * across input splits on the fields defined by field positions.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -137,8 +137,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that data is partitioned across input splits on the fields defined by field expressions.
  	 * Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -154,8 +154,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * across input splits on the fields defined by field expressions.
 	 * Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records sharing the same key (combination) must be contained in a single input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -165,7 +165,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, String partitionFields) {
 
-		if(partitionFields == null) {
+		if (partitionFields == null) {
 			throw new InvalidProgramException("PartitionFields may not be null.");
 		}
 
@@ -175,7 +175,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 		}
 
 		this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
-		if(partitionMethodId != null) {
+		if (partitionMethodId != null) {
 			this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId);
 		}
 		else {
@@ -189,8 +189,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that the data within an input split is grouped on the fields defined by the field positions.
 	 * All records sharing the same key (combination) must be subsequently emitted by the input
 	 * format for each input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -199,13 +199,13 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsGroupedBy(int... groupFields) {
 
-		if(groupFields == null) {
+		if (groupFields == null) {
 			throw new InvalidProgramException("GroupFields may not be null.");
 		} else if (groupFields.length == 0) {
 			throw new InvalidProgramException("GroupFields may not be empty.");
 		}
 
-		if(this.splitOrdering != null) {
+		if (this.splitOrdering != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
@@ -219,8 +219,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records sharing the same key (combination) must be subsequently emitted by the input
 	 * format for each input split.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -229,7 +229,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsGroupedBy(String groupFields) {
 
-		if(groupFields == null) {
+		if (groupFields == null) {
 			throw new InvalidProgramException("GroupFields may not be null.");
 		}
 
@@ -238,7 +238,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 			throw new InvalidProgramException("GroupFields may not be empty.");
 		}
 
-		if(this.splitOrdering != null) {
+		if (this.splitOrdering != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
@@ -251,8 +251,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that the data within an input split is sorted on the fields defined by the field positions
 	 * in the specified orders.
 	 * All records of an input split must be emitted by the input format in the defined order.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -262,7 +262,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, Order[] orders) {
 
-		if(orderFields == null || orders == null) {
+		if (orderFields == null || orders == null) {
 			throw new InvalidProgramException("OrderFields or Orders may not be null.");
 		} else if (orderFields.length == 0) {
 			throw new InvalidProgramException("OrderFields may not be empty.");
@@ -272,17 +272,17 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 			throw new InvalidProgramException("Number of OrderFields and Orders must match.");
 		}
 
-		if(this.splitGroupKeys != null) {
+		if (this.splitGroupKeys != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
 		this.splitOrdering = new Ordering();
 
-		for(int i=0; i<orderFields.length; i++) {
+		for (int i = 0; i < orderFields.length; i++) {
 			int pos = orderFields[i];
 			int[] flatKeys = this.getAllFlatKeys(new int[]{pos});
 
-			for(int key : flatKeys) {
+			for (int key : flatKeys) {
 				// check for duplicates
 				for (int okey : splitOrdering.getFieldPositions()) {
 					if (key == okey) {
@@ -290,7 +290,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 					}
 				}
 				// append key
-				this.splitOrdering.appendOrdering(key, null, orders[i] );
+				this.splitOrdering.appendOrdering(key, null, orders[i]);
 			}
 		}
 		return this;
@@ -300,8 +300,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 * Defines that the data within an input split is sorted on the fields defined by the field expressions
 	 * in the specified orders. Multiple field expressions must be separated by the semicolon ';' character.
 	 * All records of an input split must be emitted by the input format in the defined order.
-	 * <br>
-	 * <b>
+	 *
+	 * <p><b>
 	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
 	 * </b>
 	 *
@@ -311,7 +311,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 	 */
 	public SplitDataProperties<T> splitsOrderedBy(String orderFields, Order[] orders) {
 
-		if(orderFields == null || orders == null) {
+		if (orderFields == null || orders == null) {
 			throw new InvalidProgramException("OrderFields or Orders may not be null.");
 		}
 
@@ -324,18 +324,18 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 			throw new InvalidProgramException("Number of OrderFields and Orders must match.");
 		}
 
-		if(this.splitGroupKeys != null) {
+		if (this.splitGroupKeys != null) {
 			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
 		}
 
 		this.splitOrdering = new Ordering();
 
-		for(int i=0; i<orderKeysA.length; i++) {
+		for (int i = 0; i < orderKeysA.length; i++) {
 			String keyExp = orderKeysA[i];
 			Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
 			int[] flatKeys = ek.computeLogicalKeyPositions();
 
-			for(int key : flatKeys) {
+			for (int key : flatKeys) {
 				// check for duplicates
 				for (int okey : splitOrdering.getFieldPositions()) {
 					if (key == okey) {
@@ -343,7 +343,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 					}
 				}
 				// append key
-				this.splitOrdering.appendOrdering(key, null, orders[i] );
+				this.splitOrdering.appendOrdering(key, null, orders[i]);
 			}
 		}
 		return this;
@@ -365,25 +365,24 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 		return this.splitOrdering;
 	}
 
-
 	/////////////////////// FLAT FIELD EXTRACTION METHODS
 
 	private int[] getAllFlatKeys(String[] fieldExpressions) {
 
 		int[] allKeys = null;
 
-		for(String keyExp : fieldExpressions) {
+		for (String keyExp : fieldExpressions) {
 			Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
 			int[] flatKeys = ek.computeLogicalKeyPositions();
 
-			if(allKeys == null) {
+			if (allKeys == null) {
 				allKeys = flatKeys;
 			} else {
 				// check for duplicates
-				for(int key1 : flatKeys) {
-					for(int key2 : allKeys) {
-						if(key1 == key2) {
-							throw new InvalidProgramException("Duplicate fields in field expression "+keyExp);
+				for (int key1 : flatKeys) {
+					for (int key2 : allKeys) {
+						if (key1 == key2) {
+							throw new InvalidProgramException("Duplicate fields in field expression " + keyExp);
 						}
 					}
 				}
@@ -425,7 +424,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
 
 		@Override
 		public boolean equals(Object o) {
-			if(o instanceof SourcePartitionerMarker) {
+			if (o instanceof SourcePartitionerMarker) {
 				return this.partitionMarker.equals(((SourcePartitionerMarker<?>) o).partitionMarker);
 			} else {
 				return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index b2554bf..7d4cf9c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -18,61 +18,63 @@
 
 package org.apache.flink.api.java.io;
 
-import java.io.IOException;
-import java.nio.charset.Charset;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Input Format that reads text files. Each line results in another element.
+ */
 @PublicEvolving
 public class TextInputFormat extends DelimitedInputFormat<String> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	/**
-	 * Code of \r, used to remove \r from a line when the line ends with \r\n
+	 * Code of \r, used to remove \r from a line when the line ends with \r\n.
 	 */
 	private static final byte CARRIAGE_RETURN = (byte) '\r';
 
 	/**
-	 * Code of \n, used to identify if \n is used as delimiter
+	 * Code of \n, used to identify if \n is used as delimiter.
 	 */
 	private static final byte NEW_LINE = (byte) '\n';
-	
-	
+
 	/**
 	 * The name of the charset to use for decoding.
 	 */
 	private String charsetName = "UTF-8";
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public TextInputFormat(Path filePath) {
 		super(filePath, null);
 	}
-	
-	// --------------------------------------------------------------------------------------------	
-	
+
+	// --------------------------------------------------------------------------------------------
+
 	public String getCharsetName() {
 		return charsetName;
 	}
-	
+
 	public void setCharsetName(String charsetName) {
 		if (charsetName == null) {
 			throw new IllegalArgumentException("Charset must not be null.");
 		}
-		
+
 		this.charsetName = charsetName;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
-		
+
 		if (charsetName == null || !Charset.isSupported(charsetName)) {
 			throw new RuntimeException("Unsupported charset: " + charsetName);
 		}
@@ -83,17 +85,17 @@ public class TextInputFormat extends DelimitedInputFormat<String> {
 	@Override
 	public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException {
 		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
-		if (this.getDelimiter() != null && this.getDelimiter().length == 1 
-				&& this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1 
-				&& bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+		if (this.getDelimiter() != null && this.getDelimiter().length == 1
+				&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
+				&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
 			numBytes -= 1;
 		}
-		
+
 		return new String(bytes, offset, numBytes, this.charsetName);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "TextInputFormat (" + getFilePath() + ") - " + this.charsetName;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
index d466082..006b571 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
@@ -18,65 +18,75 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
+/**
+ * A {@link FileOutputFormat} that writes objects to a text file.
+ *
+ * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
+ * @param <T> type of elements
+ */
 @PublicEvolving
 public class TextOutputFormat<T> extends FileOutputFormat<T> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private static final int NEWLINE = '\n';
 
 	private String charsetName;
-	
+
 	private transient Charset charset;
 
 	// --------------------------------------------------------------------------------------------
 
-	public static interface TextFormatter<IN> extends Serializable {
-		public String format(IN value);
+
+	/**
+	 * Formatter that transforms values into their {@link String} representations.
+	 * @param <IN> type of input elements
+	 */
+	public interface TextFormatter<IN> extends Serializable {
+		String format(IN value);
 	}
 
 	public TextOutputFormat(Path outputPath) {
 		this(outputPath, "UTF-8");
 	}
-	
+
 	public TextOutputFormat(Path outputPath, String charset) {
 		super(outputPath);
 		this.charsetName = charset;
 	}
-	
-	
+
 	public String getCharsetName() {
 		return charsetName;
 	}
-	
+
 	public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
 		if (charsetName == null) {
 			throw new NullPointerException();
 		}
-		
+
 		if (!Charset.isSupported(charsetName)) {
 			throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported.");
 		}
-		
+
 		this.charsetName = charsetName;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
 		super.open(taskNumber, numTasks);
-		
+
 		try {
 			this.charset = Charset.forName(charsetName);
 		}
@@ -87,16 +97,16 @@ public class TextOutputFormat<T> extends FileOutputFormat<T> {
 			throw new IOException("The charset " + charsetName + " is not supported.", e);
 		}
 	}
-	
+
 	@Override
 	public void writeRecord(T record) throws IOException {
 		byte[] bytes = record.toString().getBytes(charset);
 		this.stream.write(bytes);
 		this.stream.write(NEWLINE);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index 45a2e3e..4721439 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -26,61 +32,58 @@ import java.nio.charset.CharsetDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.StringValue;
-
+/**
+ * Input format that reads text files.
+ */
 @PublicEvolving
 public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
-	
+
 	private static final long serialVersionUID = 1L;
-	
+
 	private String charsetName = "UTF-8";
-	
+
 	private boolean skipInvalidLines;
-	
+
 	private transient CharsetDecoder decoder;
-	
+
 	private transient ByteBuffer byteWrapper;
-	
+
 	private transient boolean ascii;
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public TextValueInputFormat(Path filePath) {
 		super(filePath, null);
 	}
-	
-	// --------------------------------------------------------------------------------------------	
-	
+
+	// --------------------------------------------------------------------------------------------
+
 	public String getCharsetName() {
 		return charsetName;
 	}
-	
+
 	public void setCharsetName(String charsetName) {
 		if (charsetName == null) {
 			throw new IllegalArgumentException("The charset name may not be null.");
 		}
-		
+
 		this.charsetName = charsetName;
 	}
-	
+
 	public boolean isSkipInvalidLines() {
 		return skipInvalidLines;
 	}
-	
+
 	public void setSkipInvalidLines(boolean skipInvalidLines) {
 		this.skipInvalidLines = skipInvalidLines;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
-		
+
 		if (charsetName == null || !Charset.isSupported(charsetName)) {
 			throw new RuntimeException("Unsupported charset: " + charsetName);
 		}
@@ -88,7 +91,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 		if (charsetName.equalsIgnoreCase(StandardCharsets.US_ASCII.name())) {
 			ascii = true;
 		}
-		
+
 		this.decoder = Charset.forName(charsetName).newDecoder();
 		this.byteWrapper = ByteBuffer.allocate(1);
 	}
@@ -109,7 +112,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 			}
 			byteWrapper.limit(offset + numBytes);
 			byteWrapper.position(offset);
-				
+
 			try {
 				CharBuffer result = this.decoder.decode(byteWrapper);
 				reuse.setValue(result);
@@ -126,9 +129,9 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "TextValueInputFormat (" + getFilePath() + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : "");

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
index 6efd566..887620a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -15,15 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.core.fs.Path;
 
+/**
+ * Input format that reads csv into tuples.
+ */
 @Internal
 public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
 
@@ -59,7 +62,7 @@ public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
 		super(filePath);
 		configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
 	}
-	
+
 	private void configure(String lineDelimiter, String fieldDelimiter,
 			TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
index 81a142e..108448d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
@@ -36,16 +36,16 @@ import java.io.IOException;
 public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> implements InputTypeConfigurable {
 
 	private static final long serialVersionUID = -6653022644629315158L;
-	
+
 	private TypeSerializer<T> serializer;
 
 	@Override
 	protected void serialize(T record, DataOutputView dataOutput) throws IOException {
-		if(serializer == null){
+		if (serializer == null){
 			throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to " +
 					"be defined.");
 		}
-		
+
 		serializer.serialize(record, dataOutput);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index de57e5c..5622930 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,7 +60,7 @@ public class CSVReaderTest {
 		reader.ignoreFirstLine();
 		Assert.assertTrue(reader.skipFirstLineAsHeader);
 	}
-	
+
 	@Test
 	public void testIgnoreInvalidLinesConfigure() {
 		CsvReader reader = getCsvReader();
@@ -67,7 +68,7 @@ public class CSVReaderTest {
 		reader.ignoreInvalidLines();
 		Assert.assertTrue(reader.ignoreInvalidLines);
 	}
-	
+
 	@Test
 	public void testIgnoreComments() {
 		CsvReader reader = getCsvReader();
@@ -89,38 +90,38 @@ public class CSVReaderTest {
 		CsvReader reader = getCsvReader();
 		reader.includeFields(true, true, true);
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("ttt");
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("TTT");
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("111");
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields(0x7L);
 		Assert.assertTrue(Arrays.equals(new boolean[] {true,  true, true}, reader.includedMask));
 	}
-	
+
 	@Test
 	public void testIncludeFieldsSparse() {
 		CsvReader reader = getCsvReader();
 		reader.includeFields(false, true, true, false, false, true, false, false);
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("fttfftff");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("FTTFFTFF");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields("01100100");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
@@ -128,16 +129,16 @@ public class CSVReaderTest {
 		reader = getCsvReader();
 		reader.includeFields("0t1f0TFF");
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-		
+
 		reader = getCsvReader();
 		reader.includeFields(0x26L);
 		Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
 	}
-	
+
 	@Test
 	public void testIllegalCharInStringMask() {
 		CsvReader reader = getCsvReader();
-		
+
 		try {
 			reader.includeFields("1t0Tfht");
 			Assert.fail("Reader accepted an invalid mask string");
@@ -146,12 +147,11 @@ public class CSVReaderTest {
 			// expected
 		}
 	}
-	
-	
+
 	@Test
 	public void testIncludeFieldsErrorWhenExcludingAll() {
 		CsvReader reader = getCsvReader();
-		
+
 		try {
 			reader.includeFields(false, false, false, false, false, false);
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -159,7 +159,7 @@ public class CSVReaderTest {
 		catch (IllegalArgumentException e) {
 			// all good
 		}
-		
+
 		try {
 			reader.includeFields(0);
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -167,7 +167,7 @@ public class CSVReaderTest {
 		catch (IllegalArgumentException e) {
 			// all good
 		}
-		
+
 		try {
 			reader.includeFields("ffffffffffffff");
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -175,7 +175,7 @@ public class CSVReaderTest {
 		catch (IllegalArgumentException e) {
 			// all good
 		}
-		
+
 		try {
 			reader.includeFields("00000000000000000");
 			Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -191,12 +191,12 @@ public class CSVReaderTest {
 		DataSource<Item> items = reader.tupleType(Item.class);
 		Assert.assertTrue(items.getType().getTypeClass() == Item.class);
 	}
-	
+
 	@Test
 	public void testFieldTypes() throws Exception {
 		CsvReader reader = getCsvReader();
 		DataSource<Item> items = reader.tupleType(Item.class);
-		
+
 		TypeInformation<?> info = items.getType();
 		if (!info.isTupleType()) {
 			Assert.fail();
@@ -208,44 +208,44 @@ public class CSVReaderTest {
 			Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
 
 		}
-		
+
 		CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) items.getInputFormat();
 		Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
 	}
-	
+
 	@Test
 	public void testSubClass() throws Exception {
 		CsvReader reader = getCsvReader();
 		DataSource<SubItem> sitems = reader.tupleType(SubItem.class);
 		TypeInformation<?> info = sitems.getType();
-		
+
 		Assert.assertEquals(true, info.isTupleType());
 		Assert.assertEquals(SubItem.class, info.getTypeClass());
-		
+
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-		
+
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
 		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
-		
+
 		CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
 		Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
 	}
-	
+
 	@Test
 	public void testSubClassWithPartialsInHierarchie() throws Exception {
 		CsvReader reader = getCsvReader();
 		DataSource<FinalItem> sitems = reader.tupleType(FinalItem.class);
 		TypeInformation<?> info = sitems.getType();
-		
+
 		Assert.assertEquals(true, info.isTupleType());
 		Assert.assertEquals(FinalItem.class, info.getTypeClass());
-		
+
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-		
+
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
 		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
@@ -253,15 +253,15 @@ public class CSVReaderTest {
 		Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(4).getClass());
 		Assert.assertEquals(StringValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(3)).getTypeClass());
 		Assert.assertEquals(LongValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(4)).getTypeClass());
-		
+
 		CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
 		Assert.assertArrayEquals(new Class<?>[] {Integer.class, String.class, Double.class, StringValue.class, LongValue.class}, inputFormat.getFieldTypes());
 	}
-	
+
 	@Test
 	public void testUnsupportedPartialitem() throws Exception {
 		CsvReader reader = getCsvReader();
-		
+
 		try {
 			reader.tupleType(PartialItem.class);
 			Assert.fail("tupleType() accepted an underspecified generic class.");
@@ -295,32 +295,32 @@ public class CSVReaderTest {
 		// CsvReader doesn't support custom Value type
 		reader.types(ValueItem.class);
 	}
-	
+
 	private static CsvReader getCsvReader() {
 		return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Custom types for testing
 	// --------------------------------------------------------------------------------------------
-	
-	public static class Item extends Tuple4<Integer, String, Double, String> {
+
+	private static class Item extends Tuple4<Integer, String, Double, String> {
 		private static final long serialVersionUID = -7444437337392053502L;
 	}
-	
-	public static class SubItem extends Item {
+
+	private static class SubItem extends Item {
 		private static final long serialVersionUID = 1L;
 	}
-	
-	public static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
+
+	private static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
 		private static final long serialVersionUID = 1L;
 	}
-	
-	public static class FinalItem extends PartialItem<String, StringValue, LongValue> {
+
+	private static class FinalItem extends PartialItem<String, StringValue, LongValue> {
 		private static final long serialVersionUID = 1L;
 	}
 
-	public static class ValueItem implements Value {
+	private static class ValueItem implements Value {
 		private int v1;
 
 		public int getV1() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 4dabaca..77945cc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -33,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -47,9 +43,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionInputFormat}.
+ */
 public class CollectionInputFormatTest {
-	
-	public static class ElementType {
+
+	private static class ElementType {
 		private final int id;
 
 		public ElementType(){
@@ -73,7 +77,7 @@ public class CollectionInputFormatTest {
 				return false;
 			}
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return id;
@@ -90,7 +94,8 @@ public class CollectionInputFormatTest {
 	@Test
 	public void testSerializability() {
 		try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-			 ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+			ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+
 			Collection<ElementType> inputCollection = new ArrayList<ElementType>();
 			ElementType element1 = new ElementType(1);
 			ElementType element2 = new ElementType(2);
@@ -98,10 +103,10 @@ public class CollectionInputFormatTest {
 			inputCollection.add(element1);
 			inputCollection.add(element2);
 			inputCollection.add(element3);
-	
+
 			@SuppressWarnings("unchecked")
 			TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-	
+
 			CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
 					info.createSerializer(new ExecutionConfig()));
 
@@ -121,23 +126,23 @@ public class CollectionInputFormatTest {
 			inputFormat.open(inputSplit);
 			result.open(inputSplit);
 
-			while(!inputFormat.reachedEnd() && !result.reachedEnd()){
+			while (!inputFormat.reachedEnd() && !result.reachedEnd()){
 				ElementType expectedElement = inputFormat.nextRecord(null);
 				ElementType actualElement = result.nextRecord(null);
 
 				assertEquals(expectedElement, actualElement);
 			}
 		}
-		catch(Exception e) {
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.toString());
 		}
 
 	}
-	
+
 	@Test
 	public void testSerializabilityStrings() {
-		
+
 		final String[] data = new String[] {
 				"To be, or not to be,--that is the question:--",
 				"Whether 'tis nobler in the mind to suffer",
@@ -175,33 +180,33 @@ public class CollectionInputFormatTest {
 				"The fair Ophelia!--Nymph, in thy orisons",
 				"Be all my sins remember'd."
 		};
-		
+
 		try {
 			List<String> inputCollection = Arrays.asList(data);
 			CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
-			
+
 			// serialize
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
 			ObjectOutputStream oos = new ObjectOutputStream(baos);
 			oos.writeObject(inputFormat);
 			oos.close();
-			
+
 			// deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 			ObjectInputStream ois = new ObjectInputStream(bais);
 			Object result = ois.readObject();
-			
+
 			assertTrue(result instanceof CollectionInputFormat);
-			
+
 			int i = 0;
 			@SuppressWarnings("unchecked")
 			CollectionInputFormat<String> in = (CollectionInputFormat<String>) result;
 			in.open(new GenericInputSplit(0, 1));
-			
+
 			while (!in.reachedEnd()) {
 				assertEquals(data[i++], in.nextRecord(""));
 			}
-			
+
 			assertEquals(data.length, i);
 		}
 		catch (Exception e) {
@@ -209,7 +214,7 @@ public class CollectionInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializationFailure() {
 		try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
@@ -217,7 +222,7 @@ public class CollectionInputFormatTest {
 			// a mock serializer that fails when writing
 			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
 					Collections.singleton(new ElementType()), new TestSerializer(false, true));
-			
+
 			try {
 				out.writeObject(inFormat);
 				fail("should throw an exception");
@@ -234,21 +239,21 @@ public class CollectionInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testDeserializationFailure() {
 		try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-			 ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+			ObjectOutputStream out = new ObjectOutputStream(buffer)) {
 			// a mock serializer that fails when writing
 			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
 					Collections.singleton(new ElementType()), new TestSerializer(true, false));
 
 			out.writeObject(inFormat);
 			out.close();
-			
+
 			ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
 			ObjectInputStream in = new ObjectInputStream(bais);
-			
+
 			try {
 				in.readObject();
 				fail("should throw an exception");
@@ -296,14 +301,14 @@ public class CollectionInputFormatTest {
 	private static class TestException extends IOException{
 		private static final long serialVersionUID = 1L;
 	}
-	
+
 	private static class TestSerializer extends TypeSerializer<ElementType> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private final boolean failOnRead;
 		private final boolean failOnWrite;
-		
+
 		public TestSerializer(boolean failOnRead, boolean failOnWrite) {
 			this.failOnRead = failOnRead;
 			this.failOnWrite = failOnWrite;