You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/03/25 20:45:14 UTC
[1/5] flink git commit: [FLINK-1512] [tests] Add integration tests
for CsvReader
Repository: flink
Updated Branches:
refs/heads/master 033c69f94 -> 1b42b6206
[FLINK-1512] [tests] Add integration tests for CsvReader
This closes #426
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43ac967a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43ac967a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43ac967a
Branch: refs/heads/master
Commit: 43ac967acb589790f5b3befd6f932e325d4ba681
Parents: 7a6f296
Author: Chiwan Park <ch...@icloud.com>
Authored: Wed Mar 25 15:22:43 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 25 20:38:59 2015 +0100
----------------------------------------------------------------------
.../flink/api/scala/io/CsvInputFormatTest.scala | 68 +++++++++-----------
1 file changed, 32 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/43ac967a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 4bcd35a..0d74515 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -413,6 +413,31 @@ class CsvInputFormatTest {
case class CaseClassItem(field1: Int, field2: String, field3: Double)
+ private def validatePOJOItem(format: ScalaCsvInputFormat[POJOItem]): Unit = {
+ var result = new POJOItem()
+ result = format.nextRecord(result)
+ assertEquals(123, result.field1)
+ assertEquals("HELLO", result.field2)
+ assertEquals(3.123, result.field3, 0.001)
+
+ result = format.nextRecord(result)
+ assertEquals(456, result.field1)
+ assertEquals("ABC", result.field2)
+ assertEquals(1.234, result.field3, 0.001)
+ }
+
+ private def validateCaseClassItem(format: ScalaCsvInputFormat[CaseClassItem]): Unit = {
+ var result = format.nextRecord(null)
+ assertEquals(123, result.field1)
+ assertEquals("HELLO", result.field2)
+ assertEquals(3.123, result.field3, 0.001)
+
+ result = format.nextRecord(null)
+ assertEquals(456, result.field1)
+ assertEquals("ABC", result.field2)
+ assertEquals(1.234, result.field3, 0.001)
+ }
+
@Test
def testPOJOType(): Unit = {
val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
@@ -425,16 +450,7 @@ class CsvInputFormatTest {
format.configure(new Configuration)
format.open(tempFile)
- var result = new POJOItem()
- result = format.nextRecord(result)
- assertEquals(123, result.field1)
- assertEquals("HELLO", result.field2)
- assertEquals(3.123, result.field3, 0.001)
-
- result = format.nextRecord(result)
- assertEquals(456, result.field1)
- assertEquals("ABC", result.field2)
- assertEquals(1.234, result.field3, 0.001)
+ validatePOJOItem(format)
}
@Test
@@ -449,15 +465,7 @@ class CsvInputFormatTest {
format.configure(new Configuration)
format.open(tempFile)
- var result = format.nextRecord(null)
- assertEquals(123, result.field1)
- assertEquals("HELLO", result.field2)
- assertEquals(3.123, result.field3, 0.001)
-
- result = format.nextRecord(null)
- assertEquals(456, result.field1)
- assertEquals("ABC", result.field2)
- assertEquals(1.234, result.field3, 0.001)
+ validateCaseClassItem(format)
}
@Test
@@ -474,36 +482,24 @@ class CsvInputFormatTest {
format.configure(new Configuration)
format.open(tempFile)
- var result = new POJOItem()
- result = format.nextRecord(result)
- assertEquals(123, result.field1)
- assertEquals("HELLO", result.field2)
- assertEquals(3.123, result.field3, 0.001)
-
- result = format.nextRecord(result)
- assertEquals(456, result.field1)
- assertEquals("ABC", result.field2)
- assertEquals(1.234, result.field3, 0.001)
+ validatePOJOItem(format)
}
@Test
def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
- val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
+ val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + "ABC,456,NODATA,1.234,NODATA"
val tempFile = createTempFile(fileContent)
val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
format.setDelimiter('\n')
format.setFieldDelimiter(',')
- format.setFields(Array(false, true), Array(classOf[String]): Array[Class[_]])
+ format.setFields(Array(true, true, false, true, false),
+ Array(classOf[String], classOf[Integer], classOf[java.lang.Double]): Array[Class[_]])
format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
format.configure(new Configuration)
format.open(tempFile)
- var result = format.nextRecord(new POJOItem())
- assertEquals("HELLO", result.field2)
-
- result = format.nextRecord(result)
- assertEquals("ABC", result.field2)
+ validatePOJOItem(format)
}
}
[4/5] flink git commit: [FLINK-1512] [java api] Add CsvReader for
reading into POJOs
Posted by fh...@apache.org.
[FLINK-1512] [java api] Add CsvReader for reading into POJOs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b1c19cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b1c19cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b1c19cf
Branch: refs/heads/master
Commit: 7b1c19cfc234b26484ca8746b29f865b38b96147
Parents: 033c69f
Author: Chiwan Park <ch...@icloud.com>
Authored: Thu Feb 19 03:27:59 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 25 20:38:59 2015 +0100
----------------------------------------------------------------------
.../api/common/io/GenericCsvInputFormat.java | 136 +++++++-
.../flink/api/java/io/CsvInputFormat.java | 242 +++++---------
.../org/apache/flink/api/java/io/CsvReader.java | 88 +++--
.../flink/api/java/tuple/TupleGenerator.java | 2 +-
.../flink/api/java/io/CsvInputFormatTest.java | 330 ++++++++++++++++---
.../optimizer/ReplicatingDataSourceTest.java | 36 +-
.../flink/test/io/CsvReaderWithPOJOITCase.java | 144 ++++++++
7 files changed, 738 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 31a2a5a..1803a2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -29,12 +29,21 @@ import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
private static final long serialVersionUID = 1L;
@@ -50,6 +59,13 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
// --------------------------------------------------------------------------------------------
private transient FieldParser<?>[] fieldParsers;
+
+ // To speed up readRecord processing. Used to find windows line endings.
+ // It is set when open so that readRecord does not have to evaluate it
+ protected boolean lineDelimiterIsLinebreak = false;
+
+ protected transient int commentCount;
+ protected transient int invalidLineCount;
// --------------------------------------------------------------------------------------------
@@ -58,7 +74,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private Class<?>[] fieldTypes = EMPTY_TYPES;
- private boolean[] fieldIncluded = EMPTY_INCLUDED;
+ protected boolean[] fieldIncluded = EMPTY_INCLUDED;
private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
@@ -69,8 +85,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private boolean quotedStringParsing = false;
private byte quoteCharacter;
-
-
+
+ protected byte[] commentPrefix = null;
+
+
// --------------------------------------------------------------------------------------------
// Constructors and getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
@@ -93,6 +111,46 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
return this.fieldTypes.length;
}
+ public byte[] getCommentPrefix() {
+ return commentPrefix;
+ }
+
+ public void setCommentPrefix(byte[] commentPrefix) {
+ this.commentPrefix = commentPrefix;
+ }
+
+ public void setCommentPrefix(char commentPrefix) {
+ setCommentPrefix(String.valueOf(commentPrefix));
+ }
+
+ public void setCommentPrefix(String commentPrefix) {
+ setCommentPrefix(commentPrefix, Charsets.UTF_8);
+ }
+
+ public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+ if (charsetName == null) {
+ throw new IllegalArgumentException("Charset name must not be null");
+ }
+
+ if (commentPrefix != null) {
+ Charset charset = Charset.forName(charsetName);
+ setCommentPrefix(commentPrefix, charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
+ public void setCommentPrefix(String commentPrefix, Charset charset) {
+ if (charset == null) {
+ throw new IllegalArgumentException("Charset must not be null");
+ }
+ if (commentPrefix != null) {
+ this.commentPrefix = commentPrefix.getBytes(charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
public byte[] getFieldDelimiter() {
return fieldDelim;
}
@@ -291,7 +349,23 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
readLine(); // read and ignore
}
}
-
+
+ @Override
+ public void close() throws IOException {
+ if (this.invalidLineCount > 0) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+ }
+ }
+
+ if (this.commentCount > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+ }
+ }
+ super.close();
+ }
+
protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
boolean[] fieldIncluded = this.fieldIncluded;
@@ -400,4 +474,58 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
}
}
+
+ @SuppressWarnings("unused")
+ protected static void checkAndCoSort(int[] positions, Class<?>[] types) {
+ if (positions.length != types.length) {
+ throw new IllegalArgumentException("The positions and types must be of the same length");
+ }
+
+ TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
+
+ for (int i = 0; i < positions.length; i++) {
+ if (positions[i] < 0) {
+ throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+ }
+ if (types[i] == null) {
+ throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+ }
+
+ if (map.containsKey(positions[i])) {
+ throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
+ }
+
+ map.put(positions[i], types[i]);
+ }
+
+ int i = 0;
+ for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
+ positions[i] = entry.getKey();
+ types[i] = entry.getValue();
+ i++;
+ }
+ }
+
+ protected static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
+ if (positions.length != types.length) {
+ throw new IllegalArgumentException("The positions and types must be of the same length");
+ }
+
+ int lastPos = -1;
+
+ for (int i = 0; i < positions.length; i++) {
+ if (positions[i] < 0) {
+ throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+ }
+ if (types[i] == null) {
+ throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+ }
+
+ if (positions[i] <= lastPos) {
+ throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
+ }
+
+ lastPos = positions[i];
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 7669c39..ee33484 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -19,15 +19,12 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
-
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
@@ -35,11 +32,12 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
-public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT> {
+public class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
@@ -49,106 +47,90 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
public static final String DEFAULT_LINE_DELIMITER = "\n";
-
- public static final String DEFAULT_FIELD_DELIMITER = ",";
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
private transient Object[] parsedValues;
-
- private byte[] commentPrefix = null;
-
- // To speed up readRecord processing. Used to find windows line endings.
- // It is set when open so that readRecord does not have to evaluate it
- private boolean lineDelimiterIsLinebreak = false;
-
- private transient int commentCount;
- private transient int invalidLineCount;
-
-
- public CsvInputFormat(Path filePath) {
- super(filePath);
- }
-
- public CsvInputFormat(Path filePath, Class<?> ... types) {
- this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types);
- }
+ private Class<OUT> pojoTypeClass = null;
+ private String[] pojoFieldsName = null;
+ private transient Field[] pojoFields = null;
+ private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
+
+ public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+ }
- public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class<?>... types) {
+ public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) {
super(filePath);
+ Preconditions.checkArgument(typeInformation instanceof CompositeType);
+ CompositeType<OUT> compositeType = (CompositeType<OUT>) typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
- setFieldTypes(types);
- }
-
-
- public byte[] getCommentPrefix() {
- return commentPrefix;
- }
-
- public void setCommentPrefix(byte[] commentPrefix) {
- this.commentPrefix = commentPrefix;
- }
-
- public void setCommentPrefix(char commentPrefix) {
- setCommentPrefix(String.valueOf(commentPrefix));
- }
-
- public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, Charsets.UTF_8);
- }
-
- public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
- if (charsetName == null) {
- throw new IllegalArgumentException("Charset name must not be null");
+ Class<?>[] classes = new Class<?>[typeInformation.getArity()];
+ for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) {
+ classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-
- if (commentPrefix != null) {
- Charset charset = Charset.forName(charsetName);
- setCommentPrefix(commentPrefix, charset);
- } else {
- this.commentPrefix = null;
+ setFieldTypes(classes);
+
+ if (typeInformation instanceof PojoTypeInfo) {
+ pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation;
+ pojoTypeClass = typeInformation.getTypeClass();
+ pojoFieldsName = compositeType.getFieldNames();
+ setOrderOfPOJOFields(pojoFieldsName);
}
}
-
- public void setCommentPrefix(String commentPrefix, Charset charset) {
- if (charset == null) {
- throw new IllegalArgumentException("Charset must not be null");
+
+ public void setOrderOfPOJOFields(String[] fieldsOrder) {
+ Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
+ Preconditions.checkNotNull(fieldsOrder);
+
+ int includedCount = 0;
+ for (boolean isIncluded : fieldIncluded) {
+ if (isIncluded) {
+ includedCount++;
+ }
}
- if (commentPrefix != null) {
- this.commentPrefix = commentPrefix.getBytes(charset);
- } else {
- this.commentPrefix = null;
+
+ Preconditions.checkArgument(includedCount == fieldsOrder.length, includedCount +
+ " CSV fields and " + fieldsOrder.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+ for (String field : fieldsOrder) {
+ Preconditions.checkNotNull(field, "The field name cannot be null.");
+ Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+ "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
}
+
+ pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
}
-
-
- public void setFieldTypes(Class<?> ... fieldTypes) {
+
+ public void setFieldTypes(Class<?>... fieldTypes) {
if (fieldTypes == null || fieldTypes.length == 0) {
throw new IllegalArgumentException("Field types must not be null or empty.");
}
-
+
setFieldTypesGeneric(fieldTypes);
}
public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
Preconditions.checkNotNull(sourceFieldIndices);
Preconditions.checkNotNull(fieldTypes);
-
+
checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
+
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
-
+
public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
Preconditions.checkNotNull(sourceFieldMask);
Preconditions.checkNotNull(fieldTypes);
-
+
setFieldsGeneric(sourceFieldMask, fieldTypes);
}
-
+
public Class<?>[] getFieldTypes() {
return super.getGenericFieldTypes();
}
@@ -176,25 +158,22 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
this.lineDelimiterIsLinebreak = true;
}
-
- this.commentCount = 0;
- this.invalidLineCount = 0;
- }
-
- @Override
- public void close() throws IOException {
- if (this.invalidLineCount > 0) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+
+ // for POJO type
+ if (pojoTypeClass != null) {
+ pojoFields = new Field[pojoFieldsName.length];
+ for (int i = 0; i < pojoFieldsName.length; i++) {
+ try {
+ pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
+ pojoFields[i].setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
+ }
}
}
- if (this.commentCount > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
- }
- }
- super.close();
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
}
@Override
@@ -203,10 +182,10 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
do {
returnRecord = super.nextRecord(record);
} while (returnRecord == null && !reachedEnd());
-
+
return returnRecord;
}
-
+
@Override
public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
/*
@@ -234,9 +213,21 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
}
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- // valid parse, map values into pact record
- for (int i = 0; i < parsedValues.length; i++) {
- reuse.setField(parsedValues[i], i);
+ if (pojoTypeClass == null) {
+ // result type is tuple
+ Tuple result = (Tuple) reuse;
+ for (int i = 0; i < parsedValues.length; i++) {
+ result.setField(parsedValues[i], i);
+ }
+ } else {
+ // result type is POJO
+ for (int i = 0; i < parsedValues.length; i++) {
+ try {
+ pojoFields[i].set(reuse, parsedValues[i]);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
+ }
+ }
}
return reuse;
} else {
@@ -251,59 +242,4 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
}
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("unused")
- private static void checkAndCoSort(int[] positions, Class<?>[] types) {
- if (positions.length != types.length) {
- throw new IllegalArgumentException("The positions and types must be of the same length");
- }
-
- TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
-
- for (int i = 0; i < positions.length; i++) {
- if (positions[i] < 0) {
- throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
- }
- if (types[i] == null) {
- throw new IllegalArgumentException("The type " + i + " is invalid (null)");
- }
-
- if (map.containsKey(positions[i])) {
- throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
- }
-
- map.put(positions[i], types[i]);
- }
-
- int i = 0;
- for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
- positions[i] = entry.getKey();
- types[i] = entry.getValue();
- i++;
- }
- }
-
- private static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
- if (positions.length != types.length) {
- throw new IllegalArgumentException("The positions and types must be of the same length");
- }
-
- int lastPos = -1;
-
- for (int i = 0; i < positions.length; i++) {
- if (positions[i] < 0) {
- throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
- }
- if (types[i] == null) {
- throw new IllegalArgumentException("The type " + i + " is invalid (null)");
- }
-
- if (positions[i] <= lastPos) {
- throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
- }
-
- lastPos = positions[i];
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ac879b7..11ef629 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.io;
import java.util.ArrayList;
import java.util.Arrays;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
@@ -275,8 +277,37 @@ public class CsvReader {
ignoreInvalidLines = true;
return this;
}
-
-
+
+ /**
+ * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
+ * must be public or able to set value. The type information for the fields is obtained from the type class.
+ *
+ * @param pojoType The class of the target POJO.
+ * @param pojoFields The fields of the POJO which are mapped to CSV fields.
+ * @return The DataSet representing the parsed CSV data.
+ */
+ public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
+ Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
+ Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
+
+ @SuppressWarnings("unchecked")
+ PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType);
+ CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
+
+ Class<?>[] classes = new Class<?>[pojoFields.length];
+ for (int i = 0; i < pojoFields.length; i++) {
+ int pos = typeInfo.getFieldIndex(pojoFields[i]);
+ if(pos < 0) {
+ throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
+ }
+ classes[i] = typeInfo.getPojoFieldAt(pos).type.getTypeClass();
+ }
+
+ configureInputFormat(inputFormat, classes);
+ inputFormat.setOrderOfPOJOFields(pojoFields);
+
+ return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
+ }
/**
* Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
@@ -294,7 +325,7 @@ public class CsvReader {
@SuppressWarnings("unchecked")
TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
- CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path);
+ CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
Class<?>[] classes = new Class<?>[typeInfo.getArity()];
for (int i = 0; i < typeInfo.getArity(); i++) {
@@ -318,6 +349,7 @@ public class CsvReader {
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
+
if (this.includedMask == null) {
format.setFieldTypes(types);
} else {
@@ -342,7 +374,7 @@ public class CsvReader {
*/
public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) {
TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0);
- CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path);
+ CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path, types);
configureInputFormat(inputFormat, type0);
return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -359,7 +391,7 @@ public class CsvReader {
*/
public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, Class<T1> type1) {
TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1);
- CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path);
+ CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path, types);
configureInputFormat(inputFormat, type0, type1);
return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -377,7 +409,7 @@ public class CsvReader {
*/
public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2) {
TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2);
- CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path);
+ CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2);
return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -396,7 +428,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3);
- CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path);
+ CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3);
return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -416,7 +448,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) {
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4);
- CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path);
+ CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -437,7 +469,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5) {
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5);
- CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path);
+ CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -459,7 +491,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) {
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
- CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path);
+ CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -482,7 +514,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7) {
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
- CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path);
+ CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -506,7 +538,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) {
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
- CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path);
+ CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -531,7 +563,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
- CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path);
+ CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -557,7 +589,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10) {
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
- CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path);
+ CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -584,7 +616,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) {
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
- CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path);
+ CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -612,7 +644,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12) {
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
- CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path);
+ CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -641,7 +673,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) {
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
- CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path);
+ CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -671,7 +703,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14) {
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
- CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path);
+ CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -702,7 +734,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) {
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
- CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path);
+ CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -734,7 +766,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16) {
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
- CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path);
+ CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -767,7 +799,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
- CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path);
+ CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -801,7 +833,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18) {
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
- CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path);
+ CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -836,7 +868,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19) {
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
- CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path);
+ CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -872,7 +904,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20) {
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
- CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path);
+ CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -909,7 +941,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) {
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
- CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path);
+ CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -947,7 +979,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22) {
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
- CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path);
+ CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -986,7 +1018,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23) {
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
- CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path);
+ CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -1026,7 +1058,7 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23, Class<T24> type24) {
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
- CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path);
+ CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types);
configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index ed429e3..03826fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -470,7 +470,7 @@ class TupleGenerator {
appendTupleTypeGenerics(sb, numFields);
sb.append(">> inputFormat = new CsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
- sb.append(">>(path);\n");
+ sb.append(">>(path, types);\n");
// configure input format
sb.append("\t\tconfigureInputFormat(inputFormat, ");
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 6306f6e..bff3fec 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.io;
import com.google.common.base.Charsets;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -53,7 +53,7 @@ public class CsvInputFormatTest {
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
-
+
@Test
public void ignoreInvalidLines() {
try {
@@ -67,9 +67,9 @@ public class CsvInputFormatTest {
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- CsvInputFormat<Tuple3<String, Integer, Double>> format =
- new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+ final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+ final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setLenient(true);
final Configuration parameters = new Configuration();
@@ -115,9 +115,9 @@ public class CsvInputFormatTest {
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- CsvInputFormat<Tuple3<String, Integer, Double>> format =
- new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+ final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+ final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("#");
final Configuration parameters = new Configuration();
@@ -159,9 +159,9 @@ public class CsvInputFormatTest {
"//next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- CsvInputFormat<Tuple3<String, Integer, Double>> format =
- new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+ final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+ final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("//");
final Configuration parameters = new Configuration();
@@ -196,9 +196,10 @@ public class CsvInputFormatTest {
try {
final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class);
-
+
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
+
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
@@ -239,7 +240,8 @@ public class CsvInputFormatTest {
final String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
final FileInputSplit split = createTempFile(fileContent);
- final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class);
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
final Configuration parameters = new Configuration();
format.configure(parameters);
@@ -281,12 +283,12 @@ public class CsvInputFormatTest {
try {
final String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH);
+
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
format.setFieldDelimiter("|-");
- format.setFieldTypes(String.class, String.class, String.class);
-
+
format.configure(new Configuration());
format.open(split);
@@ -324,12 +326,13 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
- format.setFieldTypes(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
-
+
format.configure(new Configuration());
format.open(split);
@@ -365,12 +368,12 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
+ final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
- format.setFieldTypes(Integer.class, Integer.class);
-
+
format.configure(new Configuration());
format.open(split);
@@ -402,8 +405,9 @@ public class CsvInputFormatTest {
final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|x|");
format.setFieldTypes(Integer.class, null, null, Integer.class, null, null, null, Integer.class);
@@ -439,8 +443,9 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
@@ -479,8 +484,9 @@ public class CsvInputFormatTest {
final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
final FileInputSplit split = createTempFile(fileContent);
-
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("&&");
@@ -516,7 +522,8 @@ public class CsvInputFormatTest {
@Test
public void testReadSparseWithShuffledPositions() throws IOException {
try {
- final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+ final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+ final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
@@ -570,8 +577,9 @@ public class CsvInputFormatTest {
final FileInputSplit split = createTempFile(fileContent);
- final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
- new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);
+ final TupleTypeInfo<Tuple5<Integer, String, String, String, Double>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class, String.class, String.class, Double.class);
+ final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo);
format.setSkipFirstLineAsHeader(true);
format.setFieldDelimiter(',');
@@ -651,9 +659,10 @@ public class CsvInputFormatTest {
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write(fileContent);
wrt.close();
-
- CsvInputFormat<Tuple1<String>> inputFormat = new CsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()),String.class);
-
+
+ final TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
+ final CsvInputFormat<Tuple1<String>> inputFormat = new CsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()), typeInfo);
+
Configuration parameters = new Configuration();
inputFormat.configure(parameters);
@@ -684,4 +693,241 @@ public class CsvInputFormatTest {
}
}
+ private void validatePojoItem(CsvInputFormat<PojoItem> format) throws Exception {
+ PojoItem item = new PojoItem();
+
+ format.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("AAA", item.field2);
+ assertEquals(Double.valueOf(3.123), item.field3);
+ assertEquals("BBB", item.field4);
+
+ format.nextRecord(item);
+
+ assertEquals(456, item.field1);
+ assertEquals("BBB", item.field2);
+ assertEquals(Double.valueOf(1.123), item.field3);
+ assertEquals("AAA", item.field4);
+ }
+
+ @Test
+ public void testPojoType() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,AAA,3.123,BBB\n");
+ wrt.write("456,BBB,1.123,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ validatePojoItem(inputFormat);
+ }
+
+ @Test
+ public void testPojoTypeWithPrivateField() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,AAA,3.123,BBB\n");
+ wrt.write("456,BBB,1.123,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+ CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ inputFormat.configure(new Configuration());
+
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+ inputFormat.open(splits[0]);
+
+ PrivatePojoItem item = new PrivatePojoItem();
+ inputFormat.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("AAA", item.field2);
+ assertEquals(Double.valueOf(3.123), item.field3);
+ assertEquals("BBB", item.field4);
+
+ inputFormat.nextRecord(item);
+
+ assertEquals(456, item.field1);
+ assertEquals("BBB", item.field2);
+ assertEquals(Double.valueOf(1.123), item.field3);
+ assertEquals("AAA", item.field4);
+ }
+
+ @Test
+ public void testPojoTypeWithMappingInformation() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,3.123,AAA,BBB\n");
+ wrt.write("456,1.123,BBB,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.setFields(new boolean[]{true, true, true, true}, new Class<?>[]{Integer.class, Double.class, String.class, String.class});
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field3", "field2", "field4"});
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ validatePojoItem(inputFormat);
+ }
+
+ @Test
+ public void testPojoTypeWithPartialFieldInCSV() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,NODATA,AAA,NODATA,3.123,BBB\n");
+ wrt.write("456,NODATA,BBB,NODATA,1.123,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String.class, Double.class, String.class});
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ validatePojoItem(inputFormat);
+ }
+
+ @Test
+ public void testPojoTypeWithMappingInfoAndPartialField() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write("123,3.123,AAA,BBB\n");
+ wrt.write("456,1.123,BBB,AAA\n");
+ wrt.close();
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.setFields(new boolean[]{true, false, false, true}, new Class[]{Integer.class, String.class});
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field4"});
+
+ inputFormat.configure(new Configuration());
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+ inputFormat.open(splits[0]);
+
+ PojoItem item = new PojoItem();
+ inputFormat.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("BBB", item.field4);
+ }
+
+ @Test
+ public void testPojoTypeWithInvalidFieldMapping() throws Exception {
+ File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+ CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+ try {
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2"});
+ fail("The number of POJO fields cannot be same as that of selected CSV fields");
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+
+ try {
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", null, "field4"});
+ fail("Fields mapping cannot contain null.");
+ } catch (NullPointerException e) {
+ // success
+ }
+
+ try {
+ inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", "field3", "field5"});
+ fail("Invalid field name");
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom types for testing
+ // --------------------------------------------------------------------------------------------
+
+ public static class PojoItem {
+ public int field1;
+ public String field2;
+ public Double field3;
+ public String field4;
+ }
+
+ public static class PrivatePojoItem {
+ private int field1;
+ private String field2;
+ private Double field3;
+ private String field4;
+
+ public int getField1() {
+ return field1;
+ }
+
+ public void setField1(int field1) {
+ this.field1 = field1;
+ }
+
+ public String getField2() {
+ return field2;
+ }
+
+ public void setField2(String field2) {
+ this.field2 = field2;
+ }
+
+ public Double getField3() {
+ return field3;
+ }
+
+ public void setField3(Double field3) {
+ this.field3 = field3;
+ }
+
+ public String getField4() {
+ return field4;
+ }
+
+ public void setField4(String field4) {
+ this.field4 = field4;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index fd451f7..230cc6b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -55,8 +55,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -91,8 +92,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -128,8 +130,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -165,8 +168,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -202,8 +206,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -239,8 +244,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -279,8 +285,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -315,8 +322,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -353,8 +361,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -378,8 +387,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -404,8 +414,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -429,8 +440,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
+ TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
- new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+ new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
[3/5] flink git commit: [FLINK-1512] [java api] Add CsvReader for
reading into POJOs
Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java
new file mode 100644
index 0000000..6a614e9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.io;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class CsvReaderWithPOJOITCase extends MultipleProgramsTestBase {
+ private String resultPath;
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public CsvReaderWithPOJOITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile("result").toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ private String createInputData(String data) throws Exception {
+ File file = tempFolder.newFile("input");
+ Files.write(data, file, Charsets.UTF_8);
+
+ return file.toURI().toString();
+ }
+
+ @Test
+ public void testPOJOType() throws Exception {
+ final String inputData = "ABC,2.20,3\nDEF,5.1,5\nDEF,3.30,1\nGHI,3.30,10";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJOItem> data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f1", "f3", "f2"});
+ data.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "ABC,3,2.20\nDEF,5,5.10\nDEF,1,3.30\nGHI,10,3.30";
+ }
+
+ @Test
+ public void testPOJOTypeWithFieldsOrder() throws Exception {
+ final String inputData = "2.20,ABC,3\n5.1,DEF,5\n3.30,DEF,1\n3.30,GHI,10";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJOItem> data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f3", "f1", "f2"});
+ data.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "ABC,3,2.20\nDEF,5,5.10\nDEF,1,3.30\nGHI,10,3.30";
+ }
+
+ @Test
+ public void testPOJOTypeWithoutFieldsOrder() throws Exception {
+ final String inputData = "";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ try {
+ env.readCsvFile(dataPath).pojoType(POJOItem.class, null);
+ fail("POJO type without fields order must raise NullPointerException!");
+ } catch (NullPointerException e) {
+ // success
+ }
+
+ expected = "";
+ resultPath = dataPath;
+ }
+
+ @Test
+ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception {
+ final String inputData = "3,2.20,ABC\n5,5.1,DEF\n1,3.30,DEF\n10,3.30,GHI";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJOItem> data = env.readCsvFile(dataPath).includeFields(true, false, true).pojoType(POJOItem.class, new String[]{"f2", "f1"});
+ data.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00";
+ }
+
+ public static class POJOItem {
+ public String f1;
+ private int f2;
+ public double f3;
+
+ public int getF2() {
+ return f2;
+ }
+
+ public void setF2(int f2) {
+ this.f2 = f2;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s,%d,%.02f", f1, f2, f3);
+ }
+ }
+}
[2/5] flink git commit: [FLINK-1512] [scala api] Add CsvReader for
reading into POJOs
Posted by fh...@apache.org.
[FLINK-1512] [scala api] Add CsvReader for reading into POJOs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a6f2960
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a6f2960
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a6f2960
Branch: refs/heads/master
Commit: 7a6f296094b26b940f9f9f66f64e5e2a0f700cb1
Parents: 7b1c19c
Author: Chiwan Park <ch...@icloud.com>
Authored: Fri Feb 20 02:23:56 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 25 20:38:59 2015 +0100
----------------------------------------------------------------------
.../scala/operators/ScalaCsvInputFormat.java | 270 ++++++++-----------
.../flink/api/scala/ExecutionEnvironment.scala | 47 +++-
.../flink/api/scala/io/CsvInputFormatTest.scala | 125 ++++++++-
.../scala/io/ScalaCsvReaderWithPOJOITCase.scala | 124 +++++++++
4 files changed, 378 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a6f2960/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index 79c6659..9adbed8 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -19,66 +19,91 @@
package org.apache.flink.api.scala.operators;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.types.parser.FieldParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.lang.reflect.Field;
+import java.util.Arrays;
-import scala.Product;
-
-public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFormat<OUT> {
+public class ScalaCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class);
-
- private transient Object[] parsedValues;
-
- // To speed up readRecord processing. Used to find windows line endings.
- // It is set when open so that readRecord does not have to evaluate it
- private boolean lineDelimiterIsLinebreak = false;
- private final TupleSerializerBase<OUT> serializer;
+ private transient Object[] parsedValues;
- private byte[] commentPrefix = null;
+ private final TupleSerializerBase<OUT> tupleSerializer;
- private transient int commentCount;
- private transient int invalidLineCount;
+ private Class<OUT> pojoTypeClass = null;
+ private String[] pojoFieldsName = null;
+ private transient Field[] pojoFields = null;
+ private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
super(filePath);
- if (!(typeInfo.isTupleType())) {
- throw new UnsupportedOperationException("This only works on tuple types.");
+ Class<?>[] classes = new Class[typeInfo.getArity()];
+
+ if (typeInfo instanceof TupleTypeInfoBase) {
+ TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
+ // We can use an empty config here, since we only use the serializer to create
+ // the top-level case class
+ tupleSerializer = (TupleSerializerBase<OUT>) tupleType.createSerializer(new ExecutionConfig());
+
+ for (int i = 0; i < tupleType.getArity(); i++) {
+ classes[i] = tupleType.getTypeAt(i).getTypeClass();
+ }
+
+ setFieldTypes(classes);
+ } else {
+ tupleSerializer = null;
+ pojoTypeInfo = (PojoTypeInfo<OUT>) typeInfo;
+ pojoTypeClass = typeInfo.getTypeClass();
+ pojoFieldsName = pojoTypeInfo.getFieldNames();
+
+ for (int i = 0, arity = pojoTypeInfo.getArity(); i < arity; i++) {
+ classes[i] = pojoTypeInfo.getTypeAt(i).getTypeClass();
+ }
+
+ setFieldTypes(classes);
+ setOrderOfPOJOFields(pojoFieldsName);
+ }
+ }
+
+ public void setOrderOfPOJOFields(String[] fieldsOrder) {
+ Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
+ Preconditions.checkNotNull(fieldsOrder);
+
+ int includedCount = 0;
+ for (boolean isIncluded : fieldIncluded) {
+ if (isIncluded) {
+ includedCount++;
+ }
}
- TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
- // We can use an empty config here, since we only use the serializer to create
- // the top-level case class
- serializer = (TupleSerializerBase<OUT>) tupleType.createSerializer(new ExecutionConfig());
-
- Class<?>[] classes = new Class[tupleType.getArity()];
- for (int i = 0; i < tupleType.getArity(); i++) {
- classes[i] = tupleType.getTypeAt(i).getTypeClass();
+
+ Preconditions.checkArgument(includedCount == fieldsOrder.length,
+ "The number of selected POJO fields should be the same as that of CSV fields.");
+
+ for (String field : fieldsOrder) {
+ Preconditions.checkNotNull(field, "The field name cannot be null.");
+ Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+ "The given field name isn't matched to POJO fields.");
}
- setFieldTypes(classes);
+
+ pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
}
public void setFieldTypes(Class<?>[] fieldTypes) {
@@ -98,98 +123,66 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
- public byte[] getCommentPrefix() {
- return commentPrefix;
- }
-
- public void setCommentPrefix(byte[] commentPrefix) {
- this.commentPrefix = commentPrefix;
- }
-
- public void setCommentPrefix(char commentPrefix) {
- setCommentPrefix(String.valueOf(commentPrefix));
- }
+ public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
+ Preconditions.checkNotNull(sourceFieldMask);
+ Preconditions.checkNotNull(fieldTypes);
- public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, Charsets.UTF_8);
+ setFieldsGeneric(sourceFieldMask, fieldTypes);
}
- public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
- if (charsetName == null) {
- throw new IllegalArgumentException("Charset name must not be null");
- }
-
- if (commentPrefix != null) {
- Charset charset = Charset.forName(charsetName);
- setCommentPrefix(commentPrefix, charset);
- } else {
- this.commentPrefix = null;
- }
+ public Class<?>[] getFieldTypes() {
+ return super.getGenericFieldTypes();
}
- public void setCommentPrefix(String commentPrefix, Charset charset) {
- if (charset == null) {
- throw new IllegalArgumentException("Charset must not be null");
- }
- if (commentPrefix != null) {
- this.commentPrefix = commentPrefix.getBytes(charset);
- } else {
- this.commentPrefix = null;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (this.invalidLineCount > 0) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
- }
- }
-
- if (this.commentCount > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
- }
- }
- super.close();
- }
-
- @Override
- public OUT nextRecord(OUT record) throws IOException {
- OUT returnRecord = null;
- do {
- returnRecord = super.nextRecord(record);
- } while (returnRecord == null && !reachedEnd());
-
- return returnRecord;
- }
-
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
-
+
@SuppressWarnings("unchecked")
FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-
+
//throw exception if no field parsers are available
if (fieldParsers.length == 0) {
throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
}
-
+
// create the value holders
this.parsedValues = new Object[fieldParsers.length];
for (int i = 0; i < fieldParsers.length; i++) {
this.parsedValues[i] = fieldParsers[i].createValue();
}
- this.commentCount = 0;
- this.invalidLineCount = 0;
-
// left to right evaluation makes access [0] okay
// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
this.lineDelimiterIsLinebreak = true;
}
+
+ // for POJO type
+ if (pojoTypeClass != null) {
+ pojoFields = new Field[pojoFieldsName.length];
+ for (int i = 0; i < pojoFieldsName.length; i++) {
+ try {
+ pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
+ pojoFields[i].setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
+ }
+ }
+ }
+
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
+ }
+
+ @Override
+ public OUT nextRecord(OUT record) throws IOException {
+ OUT returnRecord = null;
+ do {
+ returnRecord = super.nextRecord(record);
+ } while (returnRecord == null && !reachedEnd());
+
+ return returnRecord;
}
@Override
@@ -219,73 +212,22 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
}
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- OUT result = serializer.createInstance(parsedValues);
- return result;
+ if (tupleSerializer != null) {
+ return tupleSerializer.createInstance(parsedValues);
+ } else {
+ for (int i = 0; i < pojoFields.length; i++) {
+ try {
+ pojoFields[i].set(reuse, parsedValues[i]);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
+ }
+ }
+
+ return reuse;
+ }
} else {
this.invalidLineCount++;
return null;
}
}
-
-
- @Override
- public String toString() {
- return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
- }
-
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("unused")
- private static void checkAndCoSort(int[] positions, Class<?>[] types) {
- if (positions.length != types.length) {
- throw new IllegalArgumentException("The positions and types must be of the same length");
- }
-
- TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
-
- for (int i = 0; i < positions.length; i++) {
- if (positions[i] < 0) {
- throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
- }
- if (types[i] == null) {
- throw new IllegalArgumentException("The type " + i + " is invalid (null)");
- }
-
- if (map.containsKey(positions[i])) {
- throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
- }
-
- map.put(positions[i], types[i]);
- }
-
- int i = 0;
- for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
- positions[i] = entry.getKey();
- types[i] = entry.getValue();
- i++;
- }
- }
-
- private static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
- if (positions.length != types.length) {
- throw new IllegalArgumentException("The positions and types must be of the same length");
- }
-
- int lastPos = -1;
-
- for (int i = 0; i < positions.length; i++) {
- if (positions[i] < 0) {
- throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
- }
- if (types[i] == null) {
- throw new IllegalArgumentException("The type " + i + " is invalid (null)");
- }
-
- if (positions[i] <= lastPos) {
- throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
- }
-
- lastPos = positions[i];
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a6f2960/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4c1e627..7073f07 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.io._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
-import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, ValueTypeInfo, TupleTypeInfoBase}
import org.apache.flink.api.scala.hadoop.mapred
import org.apache.flink.api.scala.hadoop.mapreduce
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.{Path => HadoopPath}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
@@ -243,8 +244,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* @param lenient Whether the parser should silently ignore malformed lines.
* @param includedFields The fields in the file that should be read. Per default all fields
* are read.
+ * @param pojoFields The fields of the POJO which are mapped to CSV fields.
*/
- def readCsvFile[T <: Product : ClassTag : TypeInformation](
+ def readCsvFile[T : ClassTag : TypeInformation](
filePath: String,
lineDelimiter: String = "\n",
fieldDelimiter: String = ",",
@@ -252,9 +254,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
ignoreFirstLine: Boolean = false,
ignoreComments: String = null,
lenient: Boolean = false,
- includedFields: Array[Int] = null): DataSet[T] = {
+ includedFields: Array[Int] = null,
+ pojoFields: Array[String] = null): DataSet[T] = {
- val typeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
+ val typeInfo = implicitly[TypeInformation[T]]
val inputFormat = new ScalaCsvInputFormat[T](new Path(filePath), typeInfo)
inputFormat.setDelimiter(lineDelimiter)
@@ -267,16 +270,40 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
inputFormat.enableQuotedStringParsing(quoteCharacter);
}
- val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
- for (i <- 0 until typeInfo.getArity) {
- classes(i) = typeInfo.getTypeAt(i).getTypeClass
+ val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]]
+ typeInfo match {
+ case info: TupleTypeInfoBase[T] =>
+ for (i <- 0 until info.getArity) {
+ classesBuf += info.getTypeAt(i).getTypeClass()
+ }
+ case info: PojoTypeInfo[T] =>
+ if (pojoFields == null) {
+ throw new IllegalArgumentException(
+ "POJO fields must be specified (not null) if output type is a POJO.")
+ } else {
+ for (i <- 0 until pojoFields.length) {
+ val pos = info.getFieldIndex(pojoFields(i))
+ if (pos < 0) {
+ throw new IllegalArgumentException(
+ "Field \"" + pojoFields(i) + "\" not part of POJO type " +
+ info.getTypeClass.getCanonicalName);
+ }
+ classesBuf += info.getPojoFieldAt(pos).`type`.getTypeClass
+ }
+ }
+ case _ => throw new IllegalArgumentException("Type information is not valid.")
}
+
if (includedFields != null) {
- Validate.isTrue(typeInfo.getArity == includedFields.length, "Number of tuple fields and" +
+ Validate.isTrue(classesBuf.size == includedFields.length, "Number of tuple fields and" +
" included fields must match.")
- inputFormat.setFields(includedFields, classes)
+ inputFormat.setFields(includedFields, classesBuf.toArray)
} else {
- inputFormat.setFieldTypes(classes)
+ inputFormat.setFieldTypes(classesBuf.toArray)
+ }
+
+ if (pojoFields != null) {
+ inputFormat.setOrderOfPOJOFields(pojoFields)
}
wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName()))
http://git-wip-us.apache.org/repos/asf/flink/blob/7a6f2960/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 9964a9d..4bcd35a 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -17,21 +17,15 @@
*/
package org.apache.flink.api.scala.io
+import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
-import org.junit.Assert._
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertNotNull
-import org.junit.Assert.assertNull
-import org.junit.Assert.assertTrue
-import org.junit.Assert.fail
-import java.io.File
-import java.io.FileOutputStream
-import java.io.FileWriter
-import java.io.OutputStreamWriter
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.{FileInputSplit, Path}
+import org.junit.Assert.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
import org.junit.Test
-import org.apache.flink.api.scala._
class CsvInputFormatTest {
@@ -315,7 +309,8 @@ class CsvInputFormatTest {
PATH,
createTypeInformation[(Int, Int, Int)])
format.setFieldDelimiter("|")
- format.setFields(Array(0, 3, 7), Array(classOf[Integer], classOf[Integer], classOf[Integer]))
+ format.setFields(Array(0, 3, 7),
+ Array(classOf[Integer], classOf[Integer], classOf[Integer]): Array[Class[_]])
format.configure(new Configuration)
format.open(split)
var result: (Int, Int, Int) = null
@@ -347,7 +342,8 @@ class CsvInputFormatTest {
createTypeInformation[(Int, Int, Int)])
format.setFieldDelimiter("|")
try {
- format.setFields(Array(8, 1, 3), Array(classOf[Integer],classOf[Integer],classOf[Integer]))
+ format.setFields(Array(8, 1, 3),
+ Array(classOf[Integer], classOf[Integer], classOf[Integer]): Array[Class[_]])
fail("Input sequence should have been rejected.")
}
catch {
@@ -408,5 +404,106 @@ class CsvInputFormatTest {
fail("Test erroneous")
}
}
-}
+ class POJOItem(var field1: Int, var field2: String, var field3: Double) {
+ def this() {
+ this(-1, "", -1)
+ }
+ }
+
+ case class CaseClassItem(field1: Int, field2: String, field3: Double)
+
+ @Test
+ def testPOJOType(): Unit = {
+ val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
+ val tempFile = createTempFile(fileContent)
+ val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+ val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
+
+ format.setDelimiter('\n')
+ format.setFieldDelimiter(',')
+ format.configure(new Configuration)
+ format.open(tempFile)
+
+ var result = new POJOItem()
+ result = format.nextRecord(result)
+ assertEquals(123, result.field1)
+ assertEquals("HELLO", result.field2)
+ assertEquals(3.123, result.field3, 0.001)
+
+ result = format.nextRecord(result)
+ assertEquals(456, result.field1)
+ assertEquals("ABC", result.field2)
+ assertEquals(1.234, result.field3, 0.001)
+ }
+
+ @Test
+ def testCaseClass(): Unit = {
+ val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
+ val tempFile = createTempFile(fileContent)
+ val typeInfo: TypeInformation[CaseClassItem] = createTypeInformation[CaseClassItem]
+ val format = new ScalaCsvInputFormat[CaseClassItem](PATH, typeInfo)
+
+ format.setDelimiter('\n')
+ format.setFieldDelimiter(',')
+ format.configure(new Configuration)
+ format.open(tempFile)
+
+ var result = format.nextRecord(null)
+ assertEquals(123, result.field1)
+ assertEquals("HELLO", result.field2)
+ assertEquals(3.123, result.field3, 0.001)
+
+ result = format.nextRecord(null)
+ assertEquals(456, result.field1)
+ assertEquals("ABC", result.field2)
+ assertEquals(1.234, result.field3, 0.001)
+ }
+
+ @Test
+ def testPOJOTypeWithFieldMapping(): Unit = {
+ val fileContent = "HELLO,123,3.123\n" + "ABC,456,1.234"
+ val tempFile = createTempFile(fileContent)
+ val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+ val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
+
+ format.setDelimiter('\n')
+ format.setFieldDelimiter(',')
+ format.setFieldTypes(Array(classOf[String], classOf[Integer], classOf[java.lang.Double]))
+ format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
+ format.configure(new Configuration)
+ format.open(tempFile)
+
+ var result = new POJOItem()
+ result = format.nextRecord(result)
+ assertEquals(123, result.field1)
+ assertEquals("HELLO", result.field2)
+ assertEquals(3.123, result.field3, 0.001)
+
+ result = format.nextRecord(result)
+ assertEquals(456, result.field1)
+ assertEquals("ABC", result.field2)
+ assertEquals(1.234, result.field3, 0.001)
+ }
+
+ @Test
+ def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
+ val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
+ val tempFile = createTempFile(fileContent)
+ val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+ val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
+
+ format.setDelimiter('\n')
+ format.setFieldDelimiter(',')
+ format.setFields(Array(false, true), Array(classOf[String]): Array[Class[_]])
+ format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
+ format.configure(new Configuration)
+ format.open(tempFile)
+
+ var result = format.nextRecord(new POJOItem())
+ assertEquals("HELLO", result.field2)
+
+ result = format.nextRecord(result)
+ assertEquals("ABC", result.field2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a6f2960/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
new file mode 100644
index 0000000..21aa93d
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.io
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.Assert._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class ScalaCsvReaderWithPOJOITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private val _tempFolder = new TemporaryFolder()
+ private var resultPath: String = null
+ private var expected: String = null
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile("result").toURI.toString
+ }
+
+ @After
+ def after(): Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ def createInputData(data: String): String = {
+ val dataFile = tempFolder.newFile("data")
+ Files.write(data, dataFile, Charsets.UTF_8)
+ dataFile.toURI.toString
+ }
+
+ @Test
+ def testPOJOType(): Unit = {
+ val dataPath = createInputData("ABC,2.20,3\nDEF,5.1,5\nDEF,3.30,1\nGHI,3.30,10")
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val data = env.readCsvFile[POJOItem](dataPath, pojoFields = Array("f1", "f2", "f3"))
+
+ implicit val typeInfo = createTypeInformation[(String, Int)]
+ data.writeAsText(resultPath, WriteMode.OVERWRITE)
+
+ env.execute()
+
+ expected = "ABC,2.20,3\nDEF,5.10,5\nDEF,3.30,1\nGHI,3.30,10"
+ }
+
+ @Test
+ def testPOJOTypeWithFieldsOrder(): Unit = {
+ val dataPath = createInputData("2.20,ABC,3\n5.1,DEF,5\n3.30,DEF,1\n3.30,GHI,10")
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val data = env.readCsvFile[POJOItem](dataPath, pojoFields = Array("f2", "f1", "f3"))
+
+ implicit val typeInfo = createTypeInformation[(String, Int)]
+ data.writeAsText(resultPath, WriteMode.OVERWRITE)
+
+ env.execute()
+
+ expected = "ABC,2.20,3\nDEF,5.10,5\nDEF,3.30,1\nGHI,3.30,10"
+ }
+
+ @Test
+ def testPOJOTypeWithoutFieldsOrder(): Unit = {
+ val dataPath = createInputData("")
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ try {
+ val data = env.readCsvFile[POJOItem](dataPath)
+ fail("POJO type without fields order must raise IllegalArgumentException!")
+ } catch {
+ case _: IllegalArgumentException => // success
+ }
+
+ expected = ""
+ resultPath = dataPath
+ }
+
+ @Test
+ def testPOJOTypeWithFieldsOrderAndFieldsSelection(): Unit = {
+ val dataPath = createInputData("2.20,3,ABC\n5.1,5,DEF\n3.30,1,DEF\n3.30,10,GHI")
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val data = env.readCsvFile[POJOItem](dataPath, includedFields = Array(1, 2),
+ pojoFields = Array("f3", "f1"))
+
+ implicit val typeInfo = createTypeInformation[(String, Int)]
+ data.writeAsText(resultPath, WriteMode.OVERWRITE)
+
+ env.execute()
+
+ expected = "ABC,0.00,3\nDEF,0.00,5\nDEF,0.00,1\nGHI,0.00,10"
+ }
+}
+
+class POJOItem(var f1: String, var f2: Double, var f3: Int) {
+ def this() {
+ this("", 0.0, 0)
+ }
+
+ override def toString: String = "%s,%.02f,%d".format(f1, f2, f3)
+}
[5/5] flink git commit: [FLINK-1512] [documentation] Added CSV to
POJO feature to documenation.
Posted by fh...@apache.org.
[FLINK-1512] [documentation] Added CSV to POJO feature to documenation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b42b620
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b42b620
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b42b620
Branch: refs/heads/master
Commit: 1b42b6206845f574dc4f4ef5e47ab08be75e1cc2
Parents: 43ac967
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Mar 25 16:10:11 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 25 20:39:00 2015 +0100
----------------------------------------------------------------------
docs/programming_guide.md | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b42b620/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 3fb1702..f3a8bce 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -1548,7 +1548,7 @@ File-based:
StringValues. StringValues are mutable strings.
- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
- Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
+ Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field
types.
- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`.
@@ -1596,6 +1596,10 @@ DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file
.includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class);
+// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
+DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+ .pojoType(Person.class, "name", "age", "zipcode");
+
// create a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
@@ -1678,7 +1682,7 @@ File-based:
StringValues. StringValues are mutable strings.
- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
- Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
+ Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field
types.
Collection-based:
@@ -1724,11 +1728,16 @@ val csvInput = env.readCsvFile[(String, Double)](
includedFields = Array(0, 3)) // take the first and the fourth field
// CSV input can also be used with Case Classes
-case class MyInput(str: String, dbl: Double)
-val csvInput = env.readCsvFile[MyInput](
+case class MyCaseClass(str: String, dbl: Double)
+val csvInput = env.readCsvFile[MyCaseClass](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
+// read a CSV file with three fields into a POJO (Person) with corresponding fields
+val csvInput = env.readCsvFile[Person](
+ "hdfs:///the/CSV/file",
+ pojoFields = Array("name", "age", "zipcode"))
+
// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")
@@ -1747,6 +1756,8 @@ Flink offers a number of configuration options for CSV parsing:
- `includeFields: Array[Int]` defines which fields to read from the input file (and which to ignore). By default the first *n* fields (as defined by the number of types in the `types()` call) are parsed.
+- `pojoFields: Array[String]` specifies the fields of a POJO that are mapped to CSV fields. Parsers for CSV fields are automatically initialized based on the type and order of the POJO fields.
+
- `parseQuotedStrings: Character` enables quoted string parsing. Strings are parsed as quoted strings if the first character of the string field is the quote character (leading or tailing whitespaces are *not* trimmed). Field delimiters within quoted strings are ignored. Quoted string parsing fails if the last character of a quoted string field is not the quote character. If quoted string parsing is enabled and the first character of the field is *not* the quoting string, the string is parsed as unquoted string. By default, quoted string parsing is disabled.
- `ignoreComments: String` specifies a comment prefix. All lines that start with the specified comment prefix are not parsed and ignored. By default, no lines are ignored.
@@ -1754,7 +1765,6 @@ Flink offers a number of configuration options for CSV parsing:
- `lenient: Boolean` enables lenient parsing, i.e., lines that cannot be correctly parsed are ignored. By default, lenient parsing is disabled and invalid lines raise an exception.
- `ignoreFirstLine: Boolean` configures the InputFormat to ignore the first line of the input file. By default no line is ignored.
-
#### Recursive Traversal of the Input Path Directory