You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/03/25 20:45:17 UTC
[4/5] flink git commit: [FLINK-1512] [java api] Add CsvReader for
reading into POJOs
[FLINK-1512] [java api] Add CsvReader for reading into POJOs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b1c19cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b1c19cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b1c19cf
Branch: refs/heads/master
Commit: 7b1c19cfc234b26484ca8746b29f865b38b96147
Parents: 033c69f
Author: Chiwan Park <ch...@icloud.com>
Authored: Thu Feb 19 03:27:59 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 25 20:38:59 2015 +0100
----------------------------------------------------------------------
.../api/common/io/GenericCsvInputFormat.java | 136 +++++++-
.../flink/api/java/io/CsvInputFormat.java | 242 +++++---------
.../org/apache/flink/api/java/io/CsvReader.java | 88 +++--
.../flink/api/java/tuple/TupleGenerator.java | 2 +-
.../flink/api/java/io/CsvInputFormatTest.java | 330 ++++++++++++++++---
.../optimizer/ReplicatingDataSourceTest.java | 36 +-
.../flink/test/io/CsvReaderWithPOJOITCase.java | 144 ++++++++
7 files changed, 738 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 31a2a5a..1803a2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -29,12 +29,21 @@ import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
private static final long serialVersionUID = 1L;
@@ -50,6 +59,13 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
// --------------------------------------------------------------------------------------------
private transient FieldParser<?>[] fieldParsers;
+
+ // To speed up readRecord processing. Used to find windows line endings.
+ // It is set when open so that readRecord does not have to evaluate it
+ protected boolean lineDelimiterIsLinebreak = false;
+
+ protected transient int commentCount;
+ protected transient int invalidLineCount;
// --------------------------------------------------------------------------------------------
@@ -58,7 +74,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private Class<?>[] fieldTypes = EMPTY_TYPES;
- private boolean[] fieldIncluded = EMPTY_INCLUDED;
+ protected boolean[] fieldIncluded = EMPTY_INCLUDED;
private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
@@ -69,8 +85,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private boolean quotedStringParsing = false;
private byte quoteCharacter;
-
-
+
+ protected byte[] commentPrefix = null;
+
+
// --------------------------------------------------------------------------------------------
// Constructors and getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
@@ -93,6 +111,46 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
return this.fieldTypes.length;
}
+ public byte[] getCommentPrefix() {
+ return commentPrefix;
+ }
+
+ public void setCommentPrefix(byte[] commentPrefix) {
+ this.commentPrefix = commentPrefix;
+ }
+
+ public void setCommentPrefix(char commentPrefix) {
+ setCommentPrefix(String.valueOf(commentPrefix));
+ }
+
+ public void setCommentPrefix(String commentPrefix) {
+ setCommentPrefix(commentPrefix, Charsets.UTF_8);
+ }
+
+ public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+ if (charsetName == null) {
+ throw new IllegalArgumentException("Charset name must not be null");
+ }
+
+ if (commentPrefix != null) {
+ Charset charset = Charset.forName(charsetName);
+ setCommentPrefix(commentPrefix, charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
+ public void setCommentPrefix(String commentPrefix, Charset charset) {
+ if (charset == null) {
+ throw new IllegalArgumentException("Charset must not be null");
+ }
+ if (commentPrefix != null) {
+ this.commentPrefix = commentPrefix.getBytes(charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
public byte[] getFieldDelimiter() {
return fieldDelim;
}
@@ -291,7 +349,23 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
readLine(); // read and ignore
}
}
-
+
+ @Override
+ public void close() throws IOException {
+ if (this.invalidLineCount > 0) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+ }
+ }
+
+ if (this.commentCount > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+ }
+ }
+ super.close();
+ }
+
protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
boolean[] fieldIncluded = this.fieldIncluded;
@@ -400,4 +474,58 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
}
}
+
+ @SuppressWarnings("unused")
+ protected static void checkAndCoSort(int[] positions, Class<?>[] types) {
+ if (positions.length != types.length) {
+ throw new IllegalArgumentException("The positions and types must be of the same length");
+ }
+
+ TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
+
+ for (int i = 0; i < positions.length; i++) {
+ if (positions[i] < 0) {
+ throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+ }
+ if (types[i] == null) {
+ throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+ }
+
+ if (map.containsKey(positions[i])) {
+ throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
+ }
+
+ map.put(positions[i], types[i]);
+ }
+
+ int i = 0;
+ for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
+ positions[i] = entry.getKey();
+ types[i] = entry.getValue();
+ i++;
+ }
+ }
+
+ protected static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
+ if (positions.length != types.length) {
+ throw new IllegalArgumentException("The positions and types must be of the same length");
+ }
+
+ int lastPos = -1;
+
+ for (int i = 0; i < positions.length; i++) {
+ if (positions[i] < 0) {
+ throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+ }
+ if (types[i] == null) {
+ throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+ }
+
+ if (positions[i] <= lastPos) {
+ throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
+ }
+
+ lastPos = positions[i];
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 7669c39..ee33484 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -19,15 +19,12 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
-
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
@@ -35,11 +32,12 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
-public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT> {
+public class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
@@ -49,106 +47,90 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
public static final String DEFAULT_LINE_DELIMITER = "\n";
-
- public static final String DEFAULT_FIELD_DELIMITER = ",";
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
private transient Object[] parsedValues;
-
- private byte[] commentPrefix = null;
-
- // To speed up readRecord processing. Used to find windows line endings.
- // It is set when open so that readRecord does not have to evaluate it
- private boolean lineDelimiterIsLinebreak = false;
-
- private transient int commentCount;
- private transient int invalidLineCount;
-
-
- public CsvInputFormat(Path filePath) {
- super(filePath);
- }
-
- public CsvInputFormat(Path filePath, Class<?> ... types) {
- this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types);
- }
+ private Class<OUT> pojoTypeClass = null;
+ private String[] pojoFieldsName = null;
+ private transient Field[] pojoFields = null;
+ private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
+
+ public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+ }
- public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class<?>... types) {
+ public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) {
super(filePath);
+ Preconditions.checkArgument(typeInformation instanceof CompositeType);
+ CompositeType<OUT> compositeType = (CompositeType<OUT>) typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
- setFieldTypes(types);
- }
-
-
- public byte[] getCommentPrefix() {
- return commentPrefix;
- }
-
- public void setCommentPrefix(byte[] commentPrefix) {
- this.commentPrefix = commentPrefix;
- }
-
- public void setCommentPrefix(char commentPrefix) {
- setCommentPrefix(String.valueOf(commentPrefix));
- }
-
- public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, Charsets.UTF_8);
- }
-
- public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
- if (charsetName == null) {
- throw new IllegalArgumentException("Charset name must not be null");
+ Class<?>[] classes = new Class<?>[typeInformation.getArity()];
+ for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) {
+ classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-
- if (commentPrefix != null) {
- Charset charset = Charset.forName(charsetName);
- setCommentPrefix(commentPrefix, charset);
- } else {
- this.commentPrefix = null;
+ setFieldTypes(classes);
+
+ if (typeInformation instanceof PojoTypeInfo) {
+ pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation;
+ pojoTypeClass = typeInformation.getTypeClass();
+ pojoFieldsName = compositeType.getFieldNames();
+ setOrderOfPOJOFields(pojoFieldsName);
}
}
-
- public void setCommentPrefix(String commentPrefix, Charset charset) {
- if (charset == null) {
- throw new IllegalArgumentException("Charset must not be null");
+
+ public void setOrderOfPOJOFields(String[] fieldsOrder) {
+ Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
+ Preconditions.checkNotNull(fieldsOrder);
+
+ int includedCount = 0;
+ for (boolean isIncluded : fieldIncluded) {
+ if (isIncluded) {
+ includedCount++;
+ }
}
- if (commentPrefix != null) {
- this.commentPrefix = commentPrefix.getBytes(charset);
- } else {
- this.commentPrefix = null;
+
+ Preconditions.checkArgument(includedCount == fieldsOrder.length, includedCount +
+ " CSV fields and " + fieldsOrder.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+ for (String field : fieldsOrder) {
+ Preconditions.checkNotNull(field, "The field name cannot be null.");
+ Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+ "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
}
+
+ pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
}
-
-
- public void setFieldTypes(Class<?> ... fieldTypes) {
+
+ public void setFieldTypes(Class<?>... fieldTypes) {
if (fieldTypes == null || fieldTypes.length == 0) {
throw new IllegalArgumentException("Field types must not be null or empty.");
}
-
+
setFieldTypesGeneric(fieldTypes);
}
public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
Preconditions.checkNotNull(sourceFieldIndices);
Preconditions.checkNotNull(fieldTypes);
-
+
checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
+
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
-
+
public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
Preconditions.checkNotNull(sourceFieldMask);
Preconditions.checkNotNull(fieldTypes);
-
+
setFieldsGeneric(sourceFieldMask, fieldTypes);
}
-
+
public Class<?>[] getFieldTypes() {
return super.getGenericFieldTypes();
}
@@ -176,25 +158,22 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
this.lineDelimiterIsLinebreak = true;
}
-
- this.commentCount = 0;
- this.invalidLineCount = 0;
- }
-
- @Override
- public void close() throws IOException {
- if (this.invalidLineCount > 0) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+
+ // for POJO type
+ if (pojoTypeClass != null) {
+ pojoFields = new Field[pojoFieldsName.length];
+ for (int i = 0; i < pojoFieldsName.length; i++) {
+ try {
+ pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
+ pojoFields[i].setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
+ }
}
}
- if (this.commentCount > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
- }
- }
- super.close();
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
}
@Override
@@ -203,10 +182,10 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
do {
returnRecord = super.nextRecord(record);
} while (returnRecord == null && !reachedEnd());
-
+
return returnRecord;
}
-
+
@Override
public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
/*
@@ -234,9 +213,21 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
}
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- // valid parse, map values into pact record
- for (int i = 0; i < parsedValues.length; i++) {
- reuse.setField(parsedValues[i], i);
+ if (pojoTypeClass == null) {
+ // result type is tuple
+ Tuple result = (Tuple) reuse;
+ for (int i = 0; i < parsedValues.length; i++) {
+ result.setField(parsedValues[i], i);
+ }
+ } else {
+ // result type is POJO
+ for (int i = 0; i < parsedValues.length; i++) {
+ try {
+ pojoFields[i].set(reuse, parsedValues[i]);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
+ }
+ }
}
return reuse;
} else {
@@ -251,59 +242,4 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
}
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("unused")
- private static void checkAndCoSort(int[] positions, Class<?>[] types) {
- if (positions.length != types.length) {
- throw new IllegalArgumentException("The positions and types must be of the same length");
- }
-
- TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
-
- for (int i = 0; i < positions.length; i++) {
- if (positions[i] < 0) {
- throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
- }
- if (types[i] == null) {
- throw new IllegalArgumentException("The type " + i + " is invalid (null)");
- }
-
- if (map.containsKey(positions[i])) {
- throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
- }
-
- map.put(positions[i], types[i]);
- }
-
- int i = 0;
- for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
- positions[i] = entry.getKey();
- types[i] = entry.getValue();
- i++;
- }
- }
-
- private static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
- if (positions.length != types.length) {
- throw new IllegalArgumentException("The positions and types must be of the same length");
- }
-
- int lastPos = -1;
-
- for (int i = 0; i < positions.length; i++) {
- if (positions[i] < 0) {
- throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
- }
- if (types[i] == null) {
- throw new IllegalArgumentException("The type " + i + " is invalid (null)");
- }
-
- if (positions[i] <= lastPos) {
- throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
- }
-
- lastPos = positions[i];
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ac879b7..11ef629 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.io;
import java.util.ArrayList;
import java.util.Arrays;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
@@ -275,8 +277,37 @@ public class CsvReader {
ignoreInvalidLines = true;
return this;
}
-
-
+
+ /**
+ * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
+ * must be public or able to set value. The type information for the fields is obtained from the type class.
+ *
+ * @param pojoType The class of the target POJO.
+ * @param pojoFields The fields of the POJO which are mapped to CSV fields.
+ * @return The DataSet representing the parsed CSV data.
+ */
+ public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
+ Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
+ Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
+
+ @SuppressWarnings("unchecked")
+ PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType);
+ CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
+
+ Class<?>[] classes = new Class<?>[pojoFields.length];
+ for (int i = 0; i < pojoFields.length; i++) {
+ int pos = typeInfo.getFieldIndex(pojoFields[i]);
+ if(pos < 0) {
+ throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
+ }
+ classes[i] = typeInfo.getPojoFieldAt(pos).type.getTypeClass();
+ }
+
+ configureInputFormat(inputFormat, classes);
+ inputFormat.setOrderOfPOJOFields(pojoFields);
+
+ return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
+ }
/**
* Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
@@ -294,7 +325,7 @@ public class CsvReader {
@SuppressWarnings("unchecked")
TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
- CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path);
+ CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
Class<?>[] classes = new Class<?>[typeInfo.getArity()];
for (int i = 0; i < typeInfo.getArity(); i++) {
@@ -318,6 +349,7 @@ public class CsvReader {
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
+
if (this.includedMask == null) {
format.setFieldTypes(types);
} else {
@@ -342,7 +374,7 @@ public class CsvReader {
*/
public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) {
TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0);
- CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path);
+ CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path, types);
configureInputFormat(inputFormat, type0);
return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -359,7 +391,7 @@ public class CsvReader {
*/
public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, Class<T1> type1) {
TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1);
- CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path);
+ CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path, types);
configureInputFormat(inputFormat, type0, type1);
return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -377,7 +409,7 @@ public class CsvReader {
*/
public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2) {
TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2);
- CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path);
+ CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2);
return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -396,7 +428,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3);
- CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path);
+ CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3);
return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -416,7 +448,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) {
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4);
- CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path);
+ CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -437,7 +469,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5) {
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5);
- CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path);
+ CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -459,7 +491,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) {
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
- CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path);
+ CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -482,7 +514,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7) {
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
- CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path);
+ CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -506,7 +538,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) {
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
- CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path);
+ CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -531,7 +563,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
- CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path);
+ CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -557,7 +589,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10) {
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
- CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path);
+ CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -584,7 +616,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) {
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
- CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path);
+ CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -612,7 +644,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12) {
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
- CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path);
+ CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -641,7 +673,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) {
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
- CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path);
+ CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -671,7 +703,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14) {
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
- CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path);
+ CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -702,7 +734,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) {
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
- CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path);
+ CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -734,7 +766,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16) {
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
- CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path);
+ CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -767,7 +799,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
- CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path);
+ CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -801,7 +833,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18) {
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
- CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path);
+ CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -836,7 +868,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19) {
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
- CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path);
+ CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -872,7 +904,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20) {
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
- CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path);
+ CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -909,7 +941,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) {
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
- CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path);
+ CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -947,7 +979,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22) {
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
- CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path);
+ CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -986,7 +1018,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23) {
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
- CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path);
+ CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -1026,7 +1058,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23, Class<T24> type24) {
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
- CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path);
+ CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index ed429e3..03826fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -470,7 +470,7 @@ class TupleGenerator {
appendTupleTypeGenerics(sb, numFields);
sb.append(">> inputFormat = new CsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
- sb.append(">>(path);\n");
+ sb.append(">>(path, types);\n");
// configure input format
sb.append("\t\tconfigureInputFormat(inputFormat, ");
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 6306f6e..bff3fec 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.io;
import com.google.common.base.Charsets;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -53,7 +53,7 @@ public class CsvInputFormatTest {
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
-
+
@Test
public void ignoreInvalidLines() {
try {
@@ -67,9 +67,9 @@ public class CsvInputFormatTest {
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- CsvInputFormat<Tuple3<String, Integer, Double>> format =
- new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+ final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+ final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setLenient(true);
final Configuration parameters = new Configuration();
@@ -115,9 +115,9 @@ public class CsvInputFormatTest {
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- CsvInputFormat<Tuple3<String, Integer, Double>> format =
- new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+ final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+ final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("#");
final Configuration parameters = new Configuration();
@@ -159,9 +159,9 @@ public class CsvInputFormatTest {
"//next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- CsvInputFormat<Tuple3<String, Integer, Double>> format =
- new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+ final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+ final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("//");
final Configuration parameters = new Configuration();
@@ -196,9 +196,10 @@ public class CsvInputFormatTest {
try {
final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class);
-
+
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
+
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
@@ -239,7 +240,8 @@ public class CsvInputFormatTest {
final String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
final FileInputSplit split = createTempFile(fileContent);
- final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class);
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
final Configuration parameters = new Configuration();
format.configure(parameters);
@@ -281,12 +283,12 @@ public class CsvInputFormatTest {
try {
final String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH);
+
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
format.setFieldDelimiter("|-");
- format.setFieldTypes(String.class, String.class, String.class);
-
+
format.configure(new Configuration());
format.open(split);
@@ -324,12 +326,13 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
- format.setFieldTypes(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
-
+
format.configure(new Configuration());
format.open(split);
@@ -365,12 +368,12 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
+ final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
- format.setFieldTypes(Integer.class, Integer.class);
-
+
format.configure(new Configuration());
format.open(split);
@@ -402,8 +405,9 @@ public class CsvInputFormatTest {
final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|x|");
format.setFieldTypes(Integer.class, null, null, Integer.class, null, null, null, Integer.class);
@@ -439,8 +443,9 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
@@ -479,8 +484,9 @@ public class CsvInputFormatTest {
final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("&&");
@@ -516,7 +522,8 @@ public class CsvInputFormatTest {
@Test
public void testReadSparseWithShuffledPositions() throws IOException {
try {
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
@@ -570,8 +577,9 @@ public class CsvInputFormatTest {
final FileInputSplit split = createTempFile(fileContent);
- final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
- new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);
+ final TupleTypeInfo<Tuple5<Integer, String, String, String, Double>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class, String.class, String.class, Double.class);
+ final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo);
format.setSkipFirstLineAsHeader(true);
format.setFieldDelimiter(',');
@@ -651,9 +659,10 @@ public class CsvInputFormatTest {
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write(fileContent);
wrt.close();
-
- CsvInputFormat<Tuple1<String>> inputFormat = new CsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()),String.class);
-
+
+ final TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
+ final CsvInputFormat<Tuple1<String>> inputFormat = new CsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()), typeInfo);
+
Configuration parameters = new Configuration();
inputFormat.configure(parameters);
@@ -684,4 +693,241 @@ public class CsvInputFormatTest {
}
}
+ private void validatePojoItem(CsvInputFormat<PojoItem> format) throws Exception {
+ PojoItem item = new PojoItem();
+
+ format.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("AAA", item.field2);
+ assertEquals(Double.valueOf(3.123), item.field3);
+ assertEquals("BBB", item.field4);
+
+ format.nextRecord(item);
+
+ assertEquals(456, item.field1);
+ assertEquals("BBB", item.field2);
+ assertEquals(Double.valueOf(1.123), item.field3);
+ assertEquals("AAA", item.field4);
+ }
+
+ @Test
+ public void testPojoType() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,AAA,3.123,BBB\n");
+ wrt.write("456,BBB,1.123,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ validatePojoItem(inputFormat);
+ }
+
+ @Test
+ public void testPojoTypeWithPrivateField() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,AAA,3.123,BBB\n");
+ wrt.write("456,BBB,1.123,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+ CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ inputFormat.configure(new Configuration());
+
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+ inputFormat.open(splits[0]);
+
+ PrivatePojoItem item = new PrivatePojoItem();
+ inputFormat.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("AAA", item.field2);
+ assertEquals(Double.valueOf(3.123), item.field3);
+ assertEquals("BBB", item.field4);
+
+ inputFormat.nextRecord(item);
+
+ assertEquals(456, item.field1);
+ assertEquals("BBB", item.field2);
+ assertEquals(Double.valueOf(1.123), item.field3);
+ assertEquals("AAA", item.field4);
+ }
+
+ @Test
+ public void testPojoTypeWithMappingInformation() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,3.123,AAA,BBB\n");
+ wrt.write("456,1.123,BBB,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.setFields(new boolean[]{true, true, true, true}, new Class<?>[]{Integer.class, Double.class, String.class, String.class});
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field3", "field2", "field4"});
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ validatePojoItem(inputFormat);
+ }
+
+ @Test
+ public void testPojoTypeWithPartialFieldInCSV() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,NODATA,AAA,NODATA,3.123,BBB\n");
+ wrt.write("456,NODATA,BBB,NODATA,1.123,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String.class, Double.class, String.class});
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ validatePojoItem(inputFormat);
+ }
+
+ @Test
+ public void testPojoTypeWithMappingInfoAndPartialField() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,3.123,AAA,BBB\n");
+ wrt.write("456,1.123,BBB,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.setFields(new boolean[]{true, false, false, true}, new Class[]{Integer.class, String.class});
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field4"});
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ PojoItem item = new PojoItem();
+ inputFormat.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("BBB", item.field4);
+ }
+
+ @Test
+ public void testPojoTypeWithInvalidFieldMapping() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ try {
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2"});
+ fail("The number of POJO fields cannot be same as that of selected CSV fields");
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+
+ try {
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", null, "field4"});
+ fail("Fields mapping cannot contain null.");
+ } catch (NullPointerException e) {
+ // success
+ }
+
+ try {
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", "field3", "field5"});
+ fail("Invalid field name");
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom types for testing
+ // --------------------------------------------------------------------------------------------
+
+ public static class PojoItem {
+ public int field1;
+ public String field2;
+ public Double field3;
+ public String field4;
+ }
+
+ public static class PrivatePojoItem {
+ private int field1;
+ private String field2;
+ private Double field3;
+ private String field4;
+
+ public int getField1() {
+ return field1;
+ }
+
+ public void setField1(int field1) {
+ this.field1 = field1;
+ }
+
+ public String getField2() {
+ return field2;
+ }
+
+ public void setField2(String field2) {
+ this.field2 = field2;
+ }
+
+ public Double getField3() {
+ return field3;
+ }
+
+ public void setField3(Double field3) {
+ this.field3 = field3;
+ }
+
+ public String getField4() {
+ return field4;
+ }
+
+ public void setField4(String field4) {
+ this.field4 = field4;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index fd451f7..230cc6b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -55,8 +55,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -91,8 +92,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -128,8 +130,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -165,8 +168,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -202,8 +206,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -239,8 +244,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -279,8 +285,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -315,8 +322,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -353,8 +361,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -378,8 +387,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -404,8 +414,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -429,8 +440,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);