You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 16:23:02 UTC
[2/4] flink git commit: [FLINK-7185] Activate checkstyle flink-java/io
[FLINK-7185] Activate checkstyle flink-java/io
This closes #4340.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9c9fb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9c9fb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9c9fb5
Branch: refs/heads/master
Commit: 0c9c9fb5cb7a8a27d444db5c725c8abd792ca761
Parents: d578810
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:31:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 16:37:52 2017 +0200
----------------------------------------------------------------------
.../api/java/io/CollectionInputFormat.java | 40 ++--
.../flink/api/java/io/CsvInputFormat.java | 17 +-
.../flink/api/java/io/CsvOutputFormat.java | 26 +--
.../org/apache/flink/api/java/io/CsvReader.java | 97 ++++----
.../api/java/io/DiscardingOutputFormat.java | 1 -
.../flink/api/java/io/IteratorInputFormat.java | 12 +-
.../java/io/LocalCollectionOutputFormat.java | 21 +-
.../java/io/ParallelIteratorInputFormat.java | 20 +-
.../flink/api/java/io/PojoCsvInputFormat.java | 5 +
.../flink/api/java/io/PrimitiveInputFormat.java | 5 +-
.../flink/api/java/io/PrintingOutputFormat.java | 30 +--
.../flink/api/java/io/RowCsvInputFormat.java | 3 +
.../flink/api/java/io/SplitDataProperties.java | 85 ++++---
.../flink/api/java/io/TextInputFormat.java | 50 ++--
.../flink/api/java/io/TextOutputFormat.java | 50 ++--
.../flink/api/java/io/TextValueInputFormat.java | 57 ++---
.../flink/api/java/io/TupleCsvInputFormat.java | 9 +-
.../api/java/io/TypeSerializerOutputFormat.java | 6 +-
.../apache/flink/api/java/io/CSVReaderTest.java | 90 ++++----
.../api/java/io/CollectionInputFormatTest.java | 67 +++---
.../flink/api/java/io/CsvInputFormatTest.java | 230 ++++++++++---------
.../flink/api/java/io/CsvOutputFormatTest.java | 7 +-
.../flink/api/java/io/FromElementsTest.java | 8 +-
.../api/java/io/PrimitiveInputFormatTest.java | 35 ++-
.../api/java/io/RowCsvInputFormatTest.java | 15 +-
.../flink/api/java/io/TextInputFormatTest.java | 129 +++++------
.../api/java/io/TypeSerializerFormatTest.java | 6 +-
tools/maven/suppressions-java.xml | 7 -
28 files changed, 587 insertions(+), 541 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 90e6712..eebe56f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -18,14 +18,6 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
@@ -34,6 +26,14 @@ import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
/**
* An input format that returns objects from a collection.
*/
@@ -55,7 +55,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
}
this.serializer = serializer;
-
+
this.dataSet = dataSet;
}
@@ -67,10 +67,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);
-
+
this.iterator = this.dataSet.iterator();
}
-
+
@Override
public T nextRecord(T record) throws IOException {
return this.iterator.next();
@@ -80,10 +80,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
-
+
final int size = dataSet.size();
out.writeInt(size);
-
+
if (size > 0) {
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
for (T element : dataSet){
@@ -97,7 +97,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
-
+
if (collectionLength > 0) {
try {
DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
@@ -113,9 +113,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
dataSet = list;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -136,14 +136,14 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
sb.append(']');
return sb.toString();
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public static <X> void checkCollection(Collection<X> elements, Class<X> viewedAs) {
if (elements == null || viewedAs == null) {
throw new NullPointerException();
}
-
+
for (X elem : elements) {
if (elem == null) {
throw new IllegalArgumentException("The collection must not contain null elements.");
@@ -157,7 +157,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
if (!viewedAs.isAssignableFrom(elem.getClass()) &&
!(elem.getClass().toString().equals("class scala.runtime.BoxedUnit") && viewedAs.equals(void.class))) {
- throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
+ throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
viewedAs.getCanonicalName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index f1a16ea..0bd4e69 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -21,13 +21,18 @@ package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import java.io.IOException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
+/**
+ * InputFormat that reads csv files.
+ *
+ * @param <OUT>
+ */
@Internal
public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
@@ -38,7 +43,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
public static final String DEFAULT_FIELD_DELIMITER = ",";
protected transient Object[] parsedValues;
-
+
protected CsvInputFormat(Path filePath) {
super(filePath);
}
@@ -63,7 +68,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
// left to right evaluation makes access [0] okay
// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
- if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n') {
this.lineDelimiterIsLinebreak = true;
}
@@ -123,7 +128,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
protected static boolean[] createDefaultMask(int size) {
boolean[] includedMask = new boolean[size];
- for (int x=0; x<includedMask.length; x++) {
+ for (int x = 0; x < includedMask.length; x++) {
includedMask[x] = true;
}
return includedMask;
@@ -154,5 +159,5 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
public String toString() {
return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index c2fe13c..44fd679 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -16,18 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +27,15 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
/**
* This is an OutputFormat to serialize {@link org.apache.flink.api.java.tuple.Tuple}s to text. The output is
* structured by record delimiters and field delimiters as common in CSV files.
@@ -121,8 +122,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
/**
* Configures the format to either allow null values (writing an empty field),
* or to throw an exception when encountering a null field.
- * <p>
- * by default, null values are disallowed.
+ *
+ * <p>by default, null values are disallowed.
*
* @param allowNulls Flag to indicate whether the output format should accept null values.
*/
@@ -144,8 +145,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
* Configures whether the output format should quote string values. String values are fields
* of type {@link java.lang.String} and {@link org.apache.flink.types.StringValue}, as well as
* all subclasses of the latter.
- * <p>
- * By default, strings are not quoted.
+ *
+ * <p>By default, strings are not quoted.
*
* @param quoteStrings Flag indicating whether string fields should be quoted.
*/
@@ -215,7 +216,6 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
}
/**
- *
* The purpose of this method is solely to check whether the data type to be processed
* is in fact a tuple type.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ce2f4fa..684911a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -23,9 +23,9 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -44,14 +44,13 @@ import java.util.Arrays;
public class CsvReader {
private final Path path;
-
+
private final ExecutionEnvironment executionContext;
-
-
+
protected boolean[] includedMask;
-
+
protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
-
+
protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
protected String commentPrefix = null; //default: no comments
@@ -61,35 +60,35 @@ public class CsvReader {
protected char quoteCharacter = '"';
protected boolean skipFirstLineAsHeader = false;
-
+
protected boolean ignoreInvalidLines = false;
private String charset = "UTF-8";
-
+
// --------------------------------------------------------------------------------------------
-
+
public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
-
+
this.path = filePath;
this.executionContext = executionContext;
}
-
+
public CsvReader(String filePath, ExecutionEnvironment executionContext) {
this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
}
-
+
public Path getFilePath() {
return this.path;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Configures the delimiter that separates the lines/rows. The linebreak character
* ({@code '\n'}) is used by default.
- *
+ *
* @param delimiter The delimiter that separates the rows.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -97,15 +96,15 @@ public class CsvReader {
if (delimiter == null || delimiter.length() == 0) {
throw new IllegalArgumentException("The delimiter must not be null or an empty string");
}
-
+
this.lineDelimiter = delimiter;
return this;
}
-
+
/**
* Configures the delimiter that separates the fields within a row. The comma character
* ({@code ','}) is used by default.
- *
+ *
* @param delimiter The delimiter that separates the fields in one row.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*
@@ -148,7 +147,7 @@ public class CsvReader {
* Configures the string that starts comments.
* By default comments will be treated as invalid lines.
* This function only recognizes comments which start at the beginning of the line!
- *
+ *
* @param commentPrefix The string that starts the comments.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -156,7 +155,7 @@ public class CsvReader {
if (commentPrefix == null || commentPrefix.length() == 0) {
throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
}
-
+
this.commentPrefix = commentPrefix;
return this;
}
@@ -172,7 +171,7 @@ public class CsvReader {
}
/**
- * Sets the charset of the reader
+ * Sets the charset of the reader.
*
* @param charset The character set to set.
*/
@@ -189,7 +188,7 @@ public class CsvReader {
* the boolean array is {@code true}.
* The number of fields in the result is consequently equal to the number of times that {@code true}
* occurs in the fields array.
- *
+ *
* @param fields The array of flags that describes which fields are to be included and which not.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -197,14 +196,14 @@ public class CsvReader {
if (fields == null || fields.length == 0) {
throw new IllegalArgumentException("The set of included fields must not be null or empty.");
}
-
+
int lastTruePos = -1;
for (int i = 0; i < fields.length; i++) {
if (fields[i]) {
lastTruePos = i;
}
}
-
+
if (lastTruePos == -1) {
throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
}
@@ -225,13 +224,13 @@ public class CsvReader {
* in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
* {@code false}). The result contains the fields where the corresponding position in
* the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
- *
+ *
* @param mask The string mask defining which fields to include and which to skip.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader includeFields(String mask) {
boolean[] includedMask = new boolean[mask.length()];
-
+
for (int i = 0; i < mask.length(); i++) {
char c = mask.charAt(i);
if (c == '1' || c == 'T' || c == 't') {
@@ -240,10 +239,10 @@ public class CsvReader {
throw new IllegalArgumentException("Mask string may contain only '0' and '1'.");
}
}
-
+
return includeFields(includedMask);
}
-
+
/**
* Configures which fields of the CSV file should be included and which should be skipped. The
* bits in the value (read from least significant to most significant) define whether the field at
@@ -252,14 +251,14 @@ public class CsvReader {
* non-zero bit.
* The parser will skip over all fields where the character at the corresponding bit is zero, and
* include the fields where the corresponding bit is one.
- * <p>
- * Examples:
+ *
+ * <p>Examples:
* <ul>
* <li>A mask of {@code 0x7} would include the first three fields.</li>
* <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
* two and three, skip fields four and five, and include field six.</li>
* </ul>
- *
+ *
* @param mask The bit mask defining which fields to include and which to skip.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -267,36 +266,36 @@ public class CsvReader {
if (mask == 0) {
throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
}
-
+
ArrayList<Boolean> fields = new ArrayList<Boolean>();
while (mask != 0) {
fields.add((mask & 0x1L) != 0);
mask >>>= 1;
}
-
+
boolean[] fieldsArray = new boolean[fields.size()];
for (int i = 0; i < fieldsArray.length; i++) {
fieldsArray[i] = fields.get(i);
}
-
+
return includeFields(fieldsArray);
}
/**
* Sets the CSV reader to ignore the first line. This is useful for files that contain a header line.
- *
+ *
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader ignoreFirstLine() {
skipFirstLineAsHeader = true;
return this;
}
-
+
/**
- * Sets the CSV reader to ignore any invalid lines.
+ * Sets the CSV reader to ignore any invalid lines.
* This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
- *
+ *
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader ignoreInvalidLines(){
@@ -325,12 +324,12 @@ public class CsvReader {
return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
-
+
/**
* Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
* {@link Tuple}. The type information for the fields is obtained from the type class. The type
* consequently needs to specify all generic field types of the tuple.
- *
+ *
* @param targetType The class of the target type, needs to be a subclass of Tuple.
* @return The DataSet representing the parsed CSV data.
*/
@@ -339,24 +338,24 @@ public class CsvReader {
if (!Tuple.class.isAssignableFrom(targetType)) {
throw new IllegalArgumentException("The target type must be a subclass of " + Tuple.class.getName());
}
-
+
@SuppressWarnings("unchecked")
TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask);
-
+
Class<?>[] classes = new Class<?>[typeInfo.getArity()];
for (int i = 0; i < typeInfo.getArity(); i++) {
classes[i] = typeInfo.getTypeAt(i).getTypeClass();
}
-
+
configureInputFormat(inputFormat);
return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
-
+
// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
-
+
private void configureInputFormat(CsvInputFormat<?> format) {
format.setCharset(this.charset);
format.setDelimiter(this.lineDelimiter);
@@ -368,10 +367,10 @@ public class CsvReader {
format.enableQuotedStringParsing(this.quoteCharacter);
}
}
-
- // --------------------------------------------------------------------------------------------
+
+ // --------------------------------------------------------------------------------------------
// The following lines are generated.
- // --------------------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------------------
// BEGIN_OF_TUPLE_DEPENDENT_CODE
// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
index f01d864..7358b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Public;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
index cb8bd6a..05f3ccd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
-import java.io.Serializable;
-import java.util.Iterator;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
+import java.io.Serializable;
+import java.util.Iterator;
+
/**
* An input format that returns objects from an iterator.
*/
@@ -35,13 +34,12 @@ public class IteratorInputFormat<T> extends GenericInputFormat<T> implements Non
private static final long serialVersionUID = 1L;
private Iterator<T> iterator; // input data as serializable iterator
-
-
+
public IteratorInputFormat(Iterator<T> iterator) {
if (!(iterator instanceof Serializable)) {
throw new IllegalArgumentException("The data source iterator must be serializable.");
}
-
+
this.iterator = iterator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
index 65ed6c3..bcb1cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
@@ -18,13 +18,6 @@
package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.RichOutputFormat;
@@ -33,15 +26,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
/**
- * An output format that writes record into collection
+ * An output format that adds records to a collection.
*/
@PublicEvolving
public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implements InputTypeConfigurable {
private static final long serialVersionUID = 1L;
- private static Map<Integer,Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
+ private static final Map<Integer, Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
private transient ArrayList<T> taskResult;
@@ -67,7 +66,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
@Override
public void configure(Configuration parameters) {}
-
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.taskResult = new ArrayList<T>();
@@ -80,7 +78,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
this.taskResult.add(recordCopy);
}
-
@Override
public void close() throws IOException {
synchronized (RESULT_HOLDER) {
@@ -93,6 +90,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- this.typeSerializer = (TypeSerializer<T>)type.createSerializer(executionConfig);
+ this.typeSerializer = (TypeSerializer<T>) type.createSerializer(executionConfig);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
index a6ac853..25e25b4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
@@ -18,14 +18,13 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.util.SplittableIterator;
+import java.io.IOException;
+import java.util.Iterator;
/**
* An input format that generates data in parallel through a {@link SplittableIterator}.
@@ -34,25 +33,22 @@ import org.apache.flink.util.SplittableIterator;
public class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> {
private static final long serialVersionUID = 1L;
-
-
+
private final SplittableIterator<T> source;
-
+
private transient Iterator<T> splitIterator;
-
-
-
+
public ParallelIteratorInputFormat(SplittableIterator<T> iterator) {
this.source = iterator;
}
-
+
@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);
-
+
this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits());
}
-
+
@Override
public boolean reachedEnd() {
return !this.splitIterator.hasNext();
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
index 990e9e6..804d02b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Internal;
@@ -29,6 +30,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Input format that reads csv into POJOs.
+ * @param <OUT> resulting POJO type
+ */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index d454765..794703b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -44,7 +44,6 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
private transient FieldParser<OT> parser;
-
public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
super(filePath, null);
this.primitiveClass = primitiveClass;
@@ -70,7 +69,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
// Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
- && offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+ && offset + numBytes >= 1 && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
numBytes -= 1;
}
@@ -79,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
return parser.getLastResult();
} else {
String s = new String(bytes, offset, numBytes, getCharset());
- throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
+ throw new IOException("Could not parse value: \"" + s + "\" as type " + primitiveClass.getSimpleName());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index a010fd8..0ab1abb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -18,12 +18,16 @@
package org.apache.flink.api.java.io;
-import java.io.PrintStream;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
+import java.io.PrintStream;
+
+/**
+ * Output format that prints results into either stdout or stderr.
+ * @param <T>
+ */
@PublicEvolving
public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
@@ -37,19 +41,19 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
private boolean target;
private transient PrintStream stream;
-
+
private transient String prefix;
-
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Instantiates a printing output format that prints to standard out.
*/
public PrintingOutputFormat() {}
-
+
/**
* Instantiates a printing output format that prints to standard out.
- *
+ *
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintingOutputFormat(boolean stdErr) {
@@ -65,20 +69,18 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
this(stdErr);
this.sinkIdentifier = sinkIdentifier;
}
-
+
public void setTargetToStandardOut() {
this.target = STD_OUT;
}
-
+
public void setTargetToStandardErr() {
this.target = STD_ERR;
}
-
-
+
@Override
public void configure(Configuration parameters) {}
-
@Override
public void open(int taskNumber, int numTasks) {
// get the target stream
@@ -116,9 +118,9 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
this.prefix = null;
this.sinkIdentifier = null;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index b752966..15ef90e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -29,6 +29,9 @@ import org.apache.flink.types.parser.FieldParser;
import java.util.Arrays;
+/**
+ * Input format that reads csv into {@link Row}.
+ */
@PublicEvolving
public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultTypeQueryable<Row> {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index db09380..c1487f6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.common.operators.Keys;
import java.util.Arrays;
@@ -34,7 +34,7 @@ import java.util.Arrays;
* SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit}
* generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}.
*
- * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
+ * <p>InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
* SplitDataProperties can define that the elements which are generated by the associated InputFormat
* are
* <ul>
@@ -46,7 +46,7 @@ import java.util.Arrays;
* are in the defined order.</li>
* </ul>
*
- * <b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
+ * <p><b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
* data reorganization steps such as shuffling or sorting can be avoided.
* HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong!</b>
*
@@ -90,8 +90,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
/**
* Defines that data is partitioned across input splits on the fields defined by field positions.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -106,8 +106,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that data is partitioned using a specific partitioning method
* across input splits on the fields defined by field positions.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -137,8 +137,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that data is partitioned across input splits on the fields defined by field expressions.
* Multiple field expressions must be separated by the semicolon ';' character.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -154,8 +154,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* across input splits on the fields defined by field expressions.
* Multiple field expressions must be separated by the semicolon ';' character.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -165,7 +165,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, String partitionFields) {
- if(partitionFields == null) {
+ if (partitionFields == null) {
throw new InvalidProgramException("PartitionFields may not be null.");
}
@@ -175,7 +175,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
}
this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
- if(partitionMethodId != null) {
+ if (partitionMethodId != null) {
this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId);
}
else {
@@ -189,8 +189,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that the data within an input split is grouped on the fields defined by the field positions.
* All records sharing the same key (combination) must be subsequently emitted by the input
* format for each input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -199,13 +199,13 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsGroupedBy(int... groupFields) {
- if(groupFields == null) {
+ if (groupFields == null) {
throw new InvalidProgramException("GroupFields may not be null.");
} else if (groupFields.length == 0) {
throw new InvalidProgramException("GroupFields may not be empty.");
}
- if(this.splitOrdering != null) {
+ if (this.splitOrdering != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
@@ -219,8 +219,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Multiple field expressions must be separated by the semicolon ';' character.
* All records sharing the same key (combination) must be subsequently emitted by the input
* format for each input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -229,7 +229,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsGroupedBy(String groupFields) {
- if(groupFields == null) {
+ if (groupFields == null) {
throw new InvalidProgramException("GroupFields may not be null.");
}
@@ -238,7 +238,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
throw new InvalidProgramException("GroupFields may not be empty.");
}
- if(this.splitOrdering != null) {
+ if (this.splitOrdering != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
@@ -251,8 +251,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that the data within an input split is sorted on the fields defined by the field positions
* in the specified orders.
* All records of an input split must be emitted by the input format in the defined order.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -262,7 +262,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, Order[] orders) {
- if(orderFields == null || orders == null) {
+ if (orderFields == null || orders == null) {
throw new InvalidProgramException("OrderFields or Orders may not be null.");
} else if (orderFields.length == 0) {
throw new InvalidProgramException("OrderFields may not be empty.");
@@ -272,17 +272,17 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
throw new InvalidProgramException("Number of OrderFields and Orders must match.");
}
- if(this.splitGroupKeys != null) {
+ if (this.splitGroupKeys != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
this.splitOrdering = new Ordering();
- for(int i=0; i<orderFields.length; i++) {
+ for (int i = 0; i < orderFields.length; i++) {
int pos = orderFields[i];
int[] flatKeys = this.getAllFlatKeys(new int[]{pos});
- for(int key : flatKeys) {
+ for (int key : flatKeys) {
// check for duplicates
for (int okey : splitOrdering.getFieldPositions()) {
if (key == okey) {
@@ -290,7 +290,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
}
}
// append key
- this.splitOrdering.appendOrdering(key, null, orders[i] );
+ this.splitOrdering.appendOrdering(key, null, orders[i]);
}
}
return this;
@@ -300,8 +300,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that the data within an input split is sorted on the fields defined by the field expressions
* in the specified orders. Multiple field expressions must be separated by the semicolon ';' character.
* All records of an input split must be emitted by the input format in the defined order.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -311,7 +311,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsOrderedBy(String orderFields, Order[] orders) {
- if(orderFields == null || orders == null) {
+ if (orderFields == null || orders == null) {
throw new InvalidProgramException("OrderFields or Orders may not be null.");
}
@@ -324,18 +324,18 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
throw new InvalidProgramException("Number of OrderFields and Orders must match.");
}
- if(this.splitGroupKeys != null) {
+ if (this.splitGroupKeys != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
this.splitOrdering = new Ordering();
- for(int i=0; i<orderKeysA.length; i++) {
+ for (int i = 0; i < orderKeysA.length; i++) {
String keyExp = orderKeysA[i];
Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
int[] flatKeys = ek.computeLogicalKeyPositions();
- for(int key : flatKeys) {
+ for (int key : flatKeys) {
// check for duplicates
for (int okey : splitOrdering.getFieldPositions()) {
if (key == okey) {
@@ -343,7 +343,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
}
}
// append key
- this.splitOrdering.appendOrdering(key, null, orders[i] );
+ this.splitOrdering.appendOrdering(key, null, orders[i]);
}
}
return this;
@@ -365,25 +365,24 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
return this.splitOrdering;
}
-
/////////////////////// FLAT FIELD EXTRACTION METHODS
private int[] getAllFlatKeys(String[] fieldExpressions) {
int[] allKeys = null;
- for(String keyExp : fieldExpressions) {
+ for (String keyExp : fieldExpressions) {
Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
int[] flatKeys = ek.computeLogicalKeyPositions();
- if(allKeys == null) {
+ if (allKeys == null) {
allKeys = flatKeys;
} else {
// check for duplicates
- for(int key1 : flatKeys) {
- for(int key2 : allKeys) {
- if(key1 == key2) {
- throw new InvalidProgramException("Duplicate fields in field expression "+keyExp);
+ for (int key1 : flatKeys) {
+ for (int key2 : allKeys) {
+ if (key1 == key2) {
+ throw new InvalidProgramException("Duplicate fields in field expression " + keyExp);
}
}
}
@@ -425,7 +424,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
@Override
public boolean equals(Object o) {
- if(o instanceof SourcePartitionerMarker) {
+ if (o instanceof SourcePartitionerMarker) {
return this.partitionMarker.equals(((SourcePartitionerMarker<?>) o).partitionMarker);
} else {
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index b2554bf..7d4cf9c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -18,61 +18,63 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Input Format that reads text files. Each line results in another element.
+ */
@PublicEvolving
public class TextInputFormat extends DelimitedInputFormat<String> {
-
+
private static final long serialVersionUID = 1L;
-
+
/**
- * Code of \r, used to remove \r from a line when the line ends with \r\n
+ * Code of \r, used to remove \r from a line when the line ends with \r\n.
*/
private static final byte CARRIAGE_RETURN = (byte) '\r';
/**
- * Code of \n, used to identify if \n is used as delimiter
+ * Code of \n, used to identify if \n is used as delimiter.
*/
private static final byte NEW_LINE = (byte) '\n';
-
-
+
/**
* The name of the charset to use for decoding.
*/
private String charsetName = "UTF-8";
-
+
// --------------------------------------------------------------------------------------------
-
+
public TextInputFormat(Path filePath) {
super(filePath, null);
}
-
- // --------------------------------------------------------------------------------------------
-
+
+ // --------------------------------------------------------------------------------------------
+
public String getCharsetName() {
return charsetName;
}
-
+
public void setCharsetName(String charsetName) {
if (charsetName == null) {
throw new IllegalArgumentException("Charset must not be null.");
}
-
+
this.charsetName = charsetName;
}
-
+
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
super.configure(parameters);
-
+
if (charsetName == null || !Charset.isSupported(charsetName)) {
throw new RuntimeException("Unsupported charset: " + charsetName);
}
@@ -83,17 +85,17 @@ public class TextInputFormat extends DelimitedInputFormat<String> {
@Override
public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException {
//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
- if (this.getDelimiter() != null && this.getDelimiter().length == 1
- && this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1
- && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+ if (this.getDelimiter() != null && this.getDelimiter().length == 1
+ && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
+ && bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
numBytes -= 1;
}
-
+
return new String(bytes, offset, numBytes, this.charsetName);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "TextInputFormat (" + getFilePath() + ") - " + this.charsetName;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
index d466082..006b571 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
@@ -18,65 +18,75 @@
package org.apache.flink.api.java.io;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
+/**
+ * A {@link FileOutputFormat} that writes objects to a text file.
+ *
+ * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
+ * @param <T> type of elements
+ */
@PublicEvolving
public class TextOutputFormat<T> extends FileOutputFormat<T> {
private static final long serialVersionUID = 1L;
-
+
private static final int NEWLINE = '\n';
private String charsetName;
-
+
private transient Charset charset;
// --------------------------------------------------------------------------------------------
- public static interface TextFormatter<IN> extends Serializable {
- public String format(IN value);
+
+ /**
+ * Formatter that transforms values into their {@link String} representations.
+ * @param <IN> type of input elements
+ */
+ public interface TextFormatter<IN> extends Serializable {
+ String format(IN value);
}
public TextOutputFormat(Path outputPath) {
this(outputPath, "UTF-8");
}
-
+
public TextOutputFormat(Path outputPath, String charset) {
super(outputPath);
this.charsetName = charset;
}
-
-
+
public String getCharsetName() {
return charsetName;
}
-
+
public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new NullPointerException();
}
-
+
if (!Charset.isSupported(charsetName)) {
throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported.");
}
-
+
this.charsetName = charsetName;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
-
+
try {
this.charset = Charset.forName(charsetName);
}
@@ -87,16 +97,16 @@ public class TextOutputFormat<T> extends FileOutputFormat<T> {
throw new IOException("The charset " + charsetName + " is not supported.", e);
}
}
-
+
@Override
public void writeRecord(T record) throws IOException {
byte[] bytes = record.toString().getBytes(charset);
this.stream.write(bytes);
this.stream.write(NEWLINE);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index 45a2e3e..4721439 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -18,6 +18,12 @@
package org.apache.flink.api.java.io;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
@@ -26,61 +32,58 @@ import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.StringValue;
-
+/**
+ * Input format that reads text files.
+ */
@PublicEvolving
public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
-
+
private static final long serialVersionUID = 1L;
-
+
private String charsetName = "UTF-8";
-
+
private boolean skipInvalidLines;
-
+
private transient CharsetDecoder decoder;
-
+
private transient ByteBuffer byteWrapper;
-
+
private transient boolean ascii;
-
+
// --------------------------------------------------------------------------------------------
-
+
public TextValueInputFormat(Path filePath) {
super(filePath, null);
}
-
- // --------------------------------------------------------------------------------------------
-
+
+ // --------------------------------------------------------------------------------------------
+
public String getCharsetName() {
return charsetName;
}
-
+
public void setCharsetName(String charsetName) {
if (charsetName == null) {
throw new IllegalArgumentException("The charset name may not be null.");
}
-
+
this.charsetName = charsetName;
}
-
+
public boolean isSkipInvalidLines() {
return skipInvalidLines;
}
-
+
public void setSkipInvalidLines(boolean skipInvalidLines) {
this.skipInvalidLines = skipInvalidLines;
}
-
+
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
super.configure(parameters);
-
+
if (charsetName == null || !Charset.isSupported(charsetName)) {
throw new RuntimeException("Unsupported charset: " + charsetName);
}
@@ -88,7 +91,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
if (charsetName.equalsIgnoreCase(StandardCharsets.US_ASCII.name())) {
ascii = true;
}
-
+
this.decoder = Charset.forName(charsetName).newDecoder();
this.byteWrapper = ByteBuffer.allocate(1);
}
@@ -109,7 +112,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
}
byteWrapper.limit(offset + numBytes);
byteWrapper.position(offset);
-
+
try {
CharBuffer result = this.decoder.decode(byteWrapper);
reuse.setValue(result);
@@ -126,9 +129,9 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
}
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "TextValueInputFormat (" + getFilePath() + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : "");
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
index 6efd566..887620a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -15,15 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.core.fs.Path;
+/**
+ * Input format that reads csv into tuples.
+ */
@Internal
public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
@@ -59,7 +62,7 @@ public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
super(filePath);
configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
}
-
+
private void configure(String lineDelimiter, String fieldDelimiter,
TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
index 81a142e..108448d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
@@ -36,16 +36,16 @@ import java.io.IOException;
public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> implements InputTypeConfigurable {
private static final long serialVersionUID = -6653022644629315158L;
-
+
private TypeSerializer<T> serializer;
@Override
protected void serialize(T record, DataOutputView dataOutput) throws IOException {
- if(serializer == null){
+ if (serializer == null){
throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to " +
"be defined.");
}
-
+
serializer.serialize(record, dataOutput);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index de57e5c..5622930 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
+
import org.junit.Assert;
import org.junit.Test;
@@ -59,7 +60,7 @@ public class CSVReaderTest {
reader.ignoreFirstLine();
Assert.assertTrue(reader.skipFirstLineAsHeader);
}
-
+
@Test
public void testIgnoreInvalidLinesConfigure() {
CsvReader reader = getCsvReader();
@@ -67,7 +68,7 @@ public class CSVReaderTest {
reader.ignoreInvalidLines();
Assert.assertTrue(reader.ignoreInvalidLines);
}
-
+
@Test
public void testIgnoreComments() {
CsvReader reader = getCsvReader();
@@ -89,38 +90,38 @@ public class CSVReaderTest {
CsvReader reader = getCsvReader();
reader.includeFields(true, true, true);
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("ttt");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("TTT");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("111");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields(0x7L);
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
}
-
+
@Test
public void testIncludeFieldsSparse() {
CsvReader reader = getCsvReader();
reader.includeFields(false, true, true, false, false, true, false, false);
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("fttfftff");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("FTTFFTFF");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("01100100");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
@@ -128,16 +129,16 @@ public class CSVReaderTest {
reader = getCsvReader();
reader.includeFields("0t1f0TFF");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields(0x26L);
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
}
-
+
@Test
public void testIllegalCharInStringMask() {
CsvReader reader = getCsvReader();
-
+
try {
reader.includeFields("1t0Tfht");
Assert.fail("Reader accepted an invalid mask string");
@@ -146,12 +147,11 @@ public class CSVReaderTest {
// expected
}
}
-
-
+
@Test
public void testIncludeFieldsErrorWhenExcludingAll() {
CsvReader reader = getCsvReader();
-
+
try {
reader.includeFields(false, false, false, false, false, false);
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -159,7 +159,7 @@ public class CSVReaderTest {
catch (IllegalArgumentException e) {
// all good
}
-
+
try {
reader.includeFields(0);
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -167,7 +167,7 @@ public class CSVReaderTest {
catch (IllegalArgumentException e) {
// all good
}
-
+
try {
reader.includeFields("ffffffffffffff");
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -175,7 +175,7 @@ public class CSVReaderTest {
catch (IllegalArgumentException e) {
// all good
}
-
+
try {
reader.includeFields("00000000000000000");
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -191,12 +191,12 @@ public class CSVReaderTest {
DataSource<Item> items = reader.tupleType(Item.class);
Assert.assertTrue(items.getType().getTypeClass() == Item.class);
}
-
+
@Test
public void testFieldTypes() throws Exception {
CsvReader reader = getCsvReader();
DataSource<Item> items = reader.tupleType(Item.class);
-
+
TypeInformation<?> info = items.getType();
if (!info.isTupleType()) {
Assert.fail();
@@ -208,44 +208,44 @@ public class CSVReaderTest {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
}
-
+
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) items.getInputFormat();
Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
}
-
+
@Test
public void testSubClass() throws Exception {
CsvReader reader = getCsvReader();
DataSource<SubItem> sitems = reader.tupleType(SubItem.class);
TypeInformation<?> info = sitems.getType();
-
+
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(SubItem.class, info.getTypeClass());
-
+
@SuppressWarnings("unchecked")
TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-
+
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
-
+
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
}
-
+
@Test
public void testSubClassWithPartialsInHierarchie() throws Exception {
CsvReader reader = getCsvReader();
DataSource<FinalItem> sitems = reader.tupleType(FinalItem.class);
TypeInformation<?> info = sitems.getType();
-
+
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(FinalItem.class, info.getTypeClass());
-
+
@SuppressWarnings("unchecked")
TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-
+
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
@@ -253,15 +253,15 @@ public class CSVReaderTest {
Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(4).getClass());
Assert.assertEquals(StringValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(3)).getTypeClass());
Assert.assertEquals(LongValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(4)).getTypeClass());
-
+
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
Assert.assertArrayEquals(new Class<?>[] {Integer.class, String.class, Double.class, StringValue.class, LongValue.class}, inputFormat.getFieldTypes());
}
-
+
@Test
public void testUnsupportedPartialitem() throws Exception {
CsvReader reader = getCsvReader();
-
+
try {
reader.tupleType(PartialItem.class);
Assert.fail("tupleType() accepted an underspecified generic class.");
@@ -295,32 +295,32 @@ public class CSVReaderTest {
// CsvReader doesn't support custom Value type
reader.types(ValueItem.class);
}
-
+
private static CsvReader getCsvReader() {
return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1));
}
-
+
// --------------------------------------------------------------------------------------------
// Custom types for testing
// --------------------------------------------------------------------------------------------
-
- public static class Item extends Tuple4<Integer, String, Double, String> {
+
+ private static class Item extends Tuple4<Integer, String, Double, String> {
private static final long serialVersionUID = -7444437337392053502L;
}
-
- public static class SubItem extends Item {
+
+ private static class SubItem extends Item {
private static final long serialVersionUID = 1L;
}
-
- public static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
+
+ private static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
private static final long serialVersionUID = 1L;
}
-
- public static class FinalItem extends PartialItem<String, StringValue, LongValue> {
+
+ private static class FinalItem extends PartialItem<String, StringValue, LongValue> {
private static final long serialVersionUID = 1L;
}
- public static class ValueItem implements Value {
+ private static class ValueItem implements Value {
private int v1;
public int getV1() {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 4dabaca..77945cc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -18,11 +18,6 @@
package org.apache.flink.api.java.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -33,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -47,9 +43,17 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionInputFormat}.
+ */
public class CollectionInputFormatTest {
-
- public static class ElementType {
+
+ private static class ElementType {
private final int id;
public ElementType(){
@@ -73,7 +77,7 @@ public class CollectionInputFormatTest {
return false;
}
}
-
+
@Override
public int hashCode() {
return id;
@@ -90,7 +94,8 @@ public class CollectionInputFormatTest {
@Test
public void testSerializability() {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+ ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+
Collection<ElementType> inputCollection = new ArrayList<ElementType>();
ElementType element1 = new ElementType(1);
ElementType element2 = new ElementType(2);
@@ -98,10 +103,10 @@ public class CollectionInputFormatTest {
inputCollection.add(element1);
inputCollection.add(element2);
inputCollection.add(element3);
-
+
@SuppressWarnings("unchecked")
TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-
+
CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
info.createSerializer(new ExecutionConfig()));
@@ -121,23 +126,23 @@ public class CollectionInputFormatTest {
inputFormat.open(inputSplit);
result.open(inputSplit);
- while(!inputFormat.reachedEnd() && !result.reachedEnd()){
+ while (!inputFormat.reachedEnd() && !result.reachedEnd()){
ElementType expectedElement = inputFormat.nextRecord(null);
ElementType actualElement = result.nextRecord(null);
assertEquals(expectedElement, actualElement);
}
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
-
+
@Test
public void testSerializabilityStrings() {
-
+
final String[] data = new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
@@ -175,33 +180,33 @@ public class CollectionInputFormatTest {
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
-
+
try {
List<String> inputCollection = Arrays.asList(data);
CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
-
+
// serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(inputFormat);
oos.close();
-
+
// deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
Object result = ois.readObject();
-
+
assertTrue(result instanceof CollectionInputFormat);
-
+
int i = 0;
@SuppressWarnings("unchecked")
CollectionInputFormat<String> in = (CollectionInputFormat<String>) result;
in.open(new GenericInputSplit(0, 1));
-
+
while (!in.reachedEnd()) {
assertEquals(data[i++], in.nextRecord(""));
}
-
+
assertEquals(data.length, i);
}
catch (Exception e) {
@@ -209,7 +214,7 @@ public class CollectionInputFormatTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testSerializationFailure() {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
@@ -217,7 +222,7 @@ public class CollectionInputFormatTest {
// a mock serializer that fails when writing
CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
Collections.singleton(new ElementType()), new TestSerializer(false, true));
-
+
try {
out.writeObject(inFormat);
fail("should throw an exception");
@@ -234,21 +239,21 @@ public class CollectionInputFormatTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testDeserializationFailure() {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+ ObjectOutputStream out = new ObjectOutputStream(buffer)) {
// a mock serializer that fails when writing
CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
Collections.singleton(new ElementType()), new TestSerializer(true, false));
out.writeObject(inFormat);
out.close();
-
+
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
ObjectInputStream in = new ObjectInputStream(bais);
-
+
try {
in.readObject();
fail("should throw an exception");
@@ -296,14 +301,14 @@ public class CollectionInputFormatTest {
private static class TestException extends IOException{
private static final long serialVersionUID = 1L;
}
-
+
private static class TestSerializer extends TypeSerializer<ElementType> {
private static final long serialVersionUID = 1L;
-
+
private final boolean failOnRead;
private final boolean failOnWrite;
-
+
public TestSerializer(boolean failOnRead, boolean failOnWrite) {
this.failOnRead = failOnRead;
this.failOnWrite = failOnWrite;