You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/18 21:42:45 UTC
[2/2] flink git commit: [FLINK-2692] Untangle CsvInputFormat
[FLINK-2692] Untangle CsvInputFormat
This closes #1266
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd61f2db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd61f2db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd61f2db
Branch: refs/heads/master
Commit: bd61f2dbdf1a0215363ffa8416329e1dbf277593
Parents: fc6fec7
Author: zentol <ch...@apache.org>
Authored: Sun Oct 18 20:23:23 2015 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Nov 18 21:42:00 2015 +0100
----------------------------------------------------------------------
.../wordcount/BoltTokenizerWordCountPojo.java | 3 +-
.../BoltTokenizerWordCountWithNames.java | 3 +-
.../flink/api/java/io/CommonCsvInputFormat.java | 258 -------------------
.../flink/api/java/io/CsvInputFormat.java | 127 ++++++++-
.../org/apache/flink/api/java/io/CsvReader.java | 125 ++++-----
.../flink/api/java/io/PojoCsvInputFormat.java | 199 ++++++++++++++
.../flink/api/java/io/TupleCsvInputFormat.java | 90 +++++++
.../flink/api/java/tuple/TupleGenerator.java | 13 +-
.../java/typeutils/runtime/TupleSerializer.java | 8 +
.../typeutils/runtime/TupleSerializerBase.java | 2 +
.../flink/api/java/io/CsvInputFormatTest.java | 96 ++-----
.../flink/python/api/PythonPlanBinder.java | 13 +-
.../optimizer/ReplicatingDataSourceTest.java | 26 +-
.../scala/operators/ScalaCsvInputFormat.java | 65 -----
.../scala/operators/ScalaCsvOutputFormat.java | 6 +-
.../flink/api/scala/ExecutionEnvironment.scala | 56 ++--
.../scala/typeutils/CaseClassSerializer.scala | 4 +
.../flink/api/scala/io/CsvInputFormatTest.scala | 78 ++----
18 files changed, 573 insertions(+), 599 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
index 9bdcead..e093714 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -20,6 +20,7 @@ package org.apache.flink.storm.wordcount;
import backtype.storm.topology.IRichBolt;
import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -119,7 +120,7 @@ public class BoltTokenizerWordCountPojo {
// read the text file from given input path
PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor
.getForObject(new Sentence(""));
- return env.createInput(new CsvInputFormat<Sentence>(new Path(
+ return env.createInput(new PojoCsvInputFormat<Sentence>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
sourceType);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
index 019f1bc..f5a1a35 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -21,6 +21,7 @@ import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -122,7 +123,7 @@ public class BoltTokenizerWordCountWithNames {
// read the text file from given input path
TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
.getForObject(new Tuple1<String>(""));
- return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
+ return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
sourceType);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
deleted file mode 100644
index 444d151..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class CommonCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- public static final String DEFAULT_LINE_DELIMITER = "\n";
-
- public static final String DEFAULT_FIELD_DELIMITER = ",";
-
- protected transient Object[] parsedValues;
-
- private final Class<OUT> pojoTypeClass;
-
- private String[] pojoFieldNames;
-
- private transient PojoTypeInfo<OUT> pojoTypeInfo;
- private transient Field[] pojoFields;
-
- public CommonCsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
- this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
- }
-
- public CommonCsvInputFormat(
- Path filePath,
- String lineDelimiter,
- String fieldDelimiter,
- CompositeType<OUT> compositeTypeInfo) {
- super(filePath);
-
- setDelimiter(lineDelimiter);
- setFieldDelimiter(fieldDelimiter);
-
- Class<?>[] classes = new Class<?>[compositeTypeInfo.getArity()];
-
- for (int i = 0; i < compositeTypeInfo.getArity(); i++) {
- classes[i] = compositeTypeInfo.getTypeAt(i).getTypeClass();
- }
-
- setFieldTypes(classes);
-
- if (compositeTypeInfo instanceof PojoTypeInfo) {
- pojoTypeInfo = (PojoTypeInfo<OUT>) compositeTypeInfo;
-
- pojoTypeClass = compositeTypeInfo.getTypeClass();
- setOrderOfPOJOFields(compositeTypeInfo.getFieldNames());
- } else {
- pojoTypeClass = null;
- pojoFieldNames = null;
- }
- }
-
- public void setOrderOfPOJOFields(String[] fieldNames) {
- Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
- Preconditions.checkNotNull(fieldNames);
-
- int includedCount = 0;
- for (boolean isIncluded : fieldIncluded) {
- if (isIncluded) {
- includedCount++;
- }
- }
-
- Preconditions.checkArgument(includedCount == fieldNames.length, includedCount +
- " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
-
- for (String field : fieldNames) {
- Preconditions.checkNotNull(field, "The field name cannot be null.");
- Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
- "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
- }
-
- pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length);
- }
-
- public void setFieldTypes(Class<?>... fieldTypes) {
- if (fieldTypes == null || fieldTypes.length == 0) {
- throw new IllegalArgumentException("Field types must not be null or empty.");
- }
-
- setFieldTypesGeneric(fieldTypes);
- }
-
- public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldIndices);
- Preconditions.checkNotNull(fieldTypes);
-
- checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
- setFieldsGeneric(sourceFieldIndices, fieldTypes);
- }
-
- public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
- Preconditions.checkNotNull(sourceFieldMask);
- Preconditions.checkNotNull(fieldTypes);
-
- setFieldsGeneric(sourceFieldMask, fieldTypes);
- }
-
- public Class<?>[] getFieldTypes() {
- return super.getGenericFieldTypes();
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- @SuppressWarnings("unchecked")
- FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-
- //throw exception if no field parsers are available
- if (fieldParsers.length == 0) {
- throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
- }
-
- // create the value holders
- this.parsedValues = new Object[fieldParsers.length];
- for (int i = 0; i < fieldParsers.length; i++) {
- this.parsedValues[i] = fieldParsers[i].createValue();
- }
-
- // left to right evaluation makes access [0] okay
- // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
- if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
- this.lineDelimiterIsLinebreak = true;
- }
-
- // for POJO type
- if (pojoTypeClass != null) {
- pojoFields = new Field[pojoFieldNames.length];
-
- Map<String, Field> allFields = new HashMap<String, Field>();
-
- findAllFields(pojoTypeClass, allFields);
-
- for (int i = 0; i < pojoFieldNames.length; i++) {
- pojoFields[i] = allFields.get(pojoFieldNames[i]);
-
- if (pojoFields[i] != null) {
- pojoFields[i].setAccessible(true);
- } else {
- throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
- }
- }
- }
-
- this.commentCount = 0;
- this.invalidLineCount = 0;
- }
-
- /**
- * Finds all declared fields in a class and all its super classes.
- *
- * @param clazz Class for which all declared fields are found
- * @param allFields Map containing all found fields so far
- */
- private void findAllFields(Class<?> clazz, Map<String, Field> allFields) {
- for (Field field: clazz.getDeclaredFields()) {
- allFields.put(field.getName(), field);
- }
-
- if (clazz.getSuperclass() != null) {
- findAllFields(clazz.getSuperclass(), allFields);
- }
- }
-
- @Override
- public OUT nextRecord(OUT record) throws IOException {
- OUT returnRecord = null;
- do {
- returnRecord = super.nextRecord(record);
- } while (returnRecord == null && !reachedEnd());
-
- return returnRecord;
- }
-
- @Override
- public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
- /*
- * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
- */
- //Find windows end line, so find carriage return before the newline
- if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
- //reduce the number of bytes so that the Carriage return is not taken as data
- numBytes--;
- }
-
- if (commentPrefix != null && commentPrefix.length <= numBytes) {
- //check record for comments
- boolean isComment = true;
- for (int i = 0; i < commentPrefix.length; i++) {
- if (commentPrefix[i] != bytes[offset + i]) {
- isComment = false;
- break;
- }
- }
- if (isComment) {
- this.commentCount++;
- return null;
- }
- }
-
- if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- if (pojoTypeClass == null) {
- // result type is tuple
- return createTuple(reuse);
- } else {
- // result type is POJO
- for (int i = 0; i < parsedValues.length; i++) {
- try {
- pojoFields[i].set(reuse, parsedValues[i]);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
- }
- }
- return reuse;
- }
-
- } else {
- this.invalidLineCount++;
- return null;
- }
- }
-
- protected abstract OUT createTuple(OUT reuse);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 7d86f39..8f0aa64 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -18,32 +18,133 @@
package org.apache.flink.api.java.io;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringUtils;
-public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
+public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
+
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+ protected transient Object[] parsedValues;
- public CsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
- super(filePath, typeInformation);
+ protected CsvInputFormat(Path filePath) {
+ super(filePath);
}
-
- public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType<OUT> typeInformation) {
- super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ @SuppressWarnings("unchecked")
+ FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
+
+ //throw exception if no field parsers are available
+ if (fieldParsers.length == 0) {
+ throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
+ }
+
+ // create the value holders
+ this.parsedValues = new Object[fieldParsers.length];
+ for (int i = 0; i < fieldParsers.length; i++) {
+ this.parsedValues[i] = fieldParsers[i].createValue();
+ }
+
+ // left to right evaluation makes access [0] okay
+ // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+ this.lineDelimiterIsLinebreak = true;
+ }
+
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
+ }
+
+ @Override
+ public OUT nextRecord(OUT record) throws IOException {
+ OUT returnRecord = null;
+ do {
+ returnRecord = super.nextRecord(record);
+ } while (returnRecord == null && !reachedEnd());
+
+ return returnRecord;
}
@Override
- protected OUT createTuple(OUT reuse) {
- Tuple result = (Tuple) reuse;
- for (int i = 0; i < parsedValues.length; i++) {
- result.setField(parsedValues[i], i);
+ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+ /*
+ * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
+ */
+ //Find windows end line, so find carriage return before the newline
+ if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
+ //reduce the number of bytes so that the Carriage return is not taken as data
+ numBytes--;
+ }
+
+ if (commentPrefix != null && commentPrefix.length <= numBytes) {
+ //check record for comments
+ boolean isComment = true;
+ for (int i = 0; i < commentPrefix.length; i++) {
+ if (commentPrefix[i] != bytes[offset + i]) {
+ isComment = false;
+ break;
+ }
+ }
+ if (isComment) {
+ this.commentCount++;
+ return null;
+ }
+ }
+
+ if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+ return fillRecord(reuse, parsedValues);
+ } else {
+ this.invalidLineCount++;
+ return null;
+ }
+ }
+
+ protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
+
+ public Class<?>[] getFieldTypes() {
+ return super.getGenericFieldTypes();
+ }
+
+ protected static boolean[] createDefaultMask(int size) {
+ boolean[] includedMask = new boolean[size];
+ for (int x=0; x<includedMask.length; x++) {
+ includedMask[x] = true;
+ }
+ return includedMask;
+ }
+
+ protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
+ Preconditions.checkNotNull(sourceFieldIndices);
+
+ for (int i : sourceFieldIndices) {
+ if (i < 0) {
+ throw new IllegalArgumentException("Field indices must not be smaller than zero.");
+ }
+ }
+
+ boolean[] includedMask = new boolean[Ints.max(sourceFieldIndices) + 1];
+
+ // check if we support parsers for these types
+ for (int i = 0; i < sourceFieldIndices.length; i++) {
+ includedMask[sourceFieldIndices[i]] = true;
}
- return reuse;
+ return includedMask;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 34b5b47..1bb2eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -292,19 +292,10 @@ public class CsvReader {
@SuppressWarnings("unchecked")
PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType);
- CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
- Class<?>[] classes = new Class<?>[pojoFields.length];
- for (int i = 0; i < pojoFields.length; i++) {
- int pos = typeInfo.getFieldIndex(pojoFields[i]);
- if(pos < 0) {
- throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
- }
- classes[i] = typeInfo.getPojoFieldAt(pos).getTypeInformation().getTypeClass();
- }
+ CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, this.includedMask);
- configureInputFormat(inputFormat, classes);
- inputFormat.setOrderOfPOJOFields(pojoFields);
+ configureInputFormat(inputFormat);
return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
@@ -325,14 +316,14 @@ public class CsvReader {
@SuppressWarnings("unchecked")
TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
- CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
+ CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask);
Class<?>[] classes = new Class<?>[typeInfo.getArity()];
for (int i = 0; i < typeInfo.getArity(); i++) {
classes[i] = typeInfo.getTypeAt(i).getTypeClass();
}
- configureInputFormat(inputFormat, classes);
+ configureInputFormat(inputFormat);
return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
@@ -340,7 +331,7 @@ public class CsvReader {
// Miscellaneous
// --------------------------------------------------------------------------------------------
- private void configureInputFormat(CsvInputFormat<?> format, Class<?>... types) {
+ private void configureInputFormat(CsvInputFormat<?> format) {
format.setDelimiter(this.lineDelimiter);
format.setFieldDelimiter(this.fieldDelimiter);
format.setCommentPrefix(this.commentPrefix);
@@ -349,12 +340,6 @@ public class CsvReader {
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
-
- if (this.includedMask == null) {
- format.setFieldTypes(types);
- } else {
- format.setFields(this.includedMask, types);
- }
}
// --------------------------------------------------------------------------------------------
@@ -374,8 +359,8 @@ public class CsvReader {
*/
public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) {
TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0);
- CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path, types);
- configureInputFormat(inputFormat, type0);
+ CsvInputFormat<Tuple1<T0>> inputFormat = new TupleCsvInputFormat<Tuple1<T0>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -391,8 +376,8 @@ public class CsvReader {
*/
public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, Class<T1> type1) {
TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1);
- CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path, types);
- configureInputFormat(inputFormat, type0, type1);
+ CsvInputFormat<Tuple2<T0, T1>> inputFormat = new TupleCsvInputFormat<Tuple2<T0, T1>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -409,8 +394,8 @@ public class CsvReader {
*/
public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2) {
TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2);
- CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2);
+ CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new TupleCsvInputFormat<Tuple3<T0, T1, T2>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -428,8 +413,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3);
- CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3);
+ CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new TupleCsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -448,8 +433,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) {
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4);
- CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
+ CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new TupleCsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -469,8 +454,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5) {
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5);
- CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
+ CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new TupleCsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -491,8 +476,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) {
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
- CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
+ CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new TupleCsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -514,8 +499,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7) {
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
- CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
+ CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new TupleCsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -538,8 +523,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) {
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
- CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
+ CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new TupleCsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -563,8 +548,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
- CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
+ CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new TupleCsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -589,8 +574,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10) {
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
- CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
+ CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new TupleCsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -616,8 +601,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) {
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
- CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
+ CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new TupleCsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -644,8 +629,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12) {
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
- CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
+ CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new TupleCsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -673,8 +658,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) {
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
- CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
+ CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new TupleCsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -703,8 +688,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14) {
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
- CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
+ CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new TupleCsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -734,8 +719,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) {
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
- CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
+ CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new TupleCsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -766,8 +751,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16) {
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
- CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
+ CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new TupleCsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -799,8 +784,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
- CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
+ CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new TupleCsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -833,8 +818,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18) {
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
- CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
+ CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new TupleCsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -868,8 +853,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19) {
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
- CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
+ CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new TupleCsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -904,8 +889,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20) {
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
- CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
+ CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new TupleCsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -941,8 +926,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) {
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
- CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
+ CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new TupleCsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -979,8 +964,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22) {
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
- CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
+ CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new TupleCsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -1018,8 +1003,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23) {
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
- CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
+ CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new TupleCsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
@@ -1058,8 +1043,8 @@ public class CsvReader {
*/
public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23, Class<T24> type24) {
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
- CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types);
- configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
+ CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new TupleCsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types, this.includedMask);
+ configureInputFormat(inputFormat);
return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
new file mode 100644
index 0000000..2f1139c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Class<OUT> pojoTypeClass;
+
+ private String[] pojoFieldNames;
+
+ private transient PojoTypeInfo<OUT> pojoTypeInfo;
+ private transient Field[] pojoFields;
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo);
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, createDefaultMask(fieldNames.length));
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+ super(filePath);
+ boolean[] mask = (includedFieldsMask == null)
+ ? createDefaultMask(fieldNames.length)
+ : toBooleanMask(includedFieldsMask);
+ configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, mask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) {
+ this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+ }
+
+ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+ super(filePath);
+ configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, includedFieldsMask);
+ }
+
+ private void configure(String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+
+ if (includedFieldsMask == null) {
+ includedFieldsMask = createDefaultMask(fieldNames.length);
+ }
+
+ for (String name : fieldNames) {
+ if (name == null) {
+ throw new NullPointerException("Field name must not be null.");
+ }
+ if (pojoTypeInfo.getFieldIndex(name) < 0) {
+ throw new IllegalArgumentException("Field \"" + name + "\" not part of POJO type " + pojoTypeInfo.getTypeClass().getCanonicalName());
+ }
+ }
+
+ setDelimiter(lineDelimiter);
+ setFieldDelimiter(fieldDelimiter);
+
+ Class<?>[] classes = new Class<?>[fieldNames.length];
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ try {
+ classes[i] = pojoTypeInfo.getTypeAt(pojoTypeInfo.getFieldIndex(fieldNames[i])).getTypeClass();
+ } catch (IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("Invalid field name: " + fieldNames[i]);
+ }
+ }
+
+ this.pojoTypeClass = pojoTypeInfo.getTypeClass();
+ this.pojoTypeInfo = pojoTypeInfo;
+ setFieldsGeneric(includedFieldsMask, classes);
+ setOrderOfPOJOFields(fieldNames);
+ }
+
+ private void setOrderOfPOJOFields(String[] fieldNames) {
+ Preconditions.checkNotNull(fieldNames);
+
+ int includedCount = 0;
+ for (boolean isIncluded : fieldIncluded) {
+ if (isIncluded) {
+ includedCount++;
+ }
+ }
+
+ Preconditions.checkArgument(includedCount == fieldNames.length, includedCount
+ + " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+ for (String field : fieldNames) {
+ Preconditions.checkNotNull(field, "The field name cannot be null.");
+ Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+ "Field \"" + field + "\" is not a member of POJO class " + pojoTypeClass.getName());
+ }
+
+ pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length);
+ }
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ pojoFields = new Field[pojoFieldNames.length];
+
+ Map<String, Field> allFields = new HashMap<String, Field>();
+
+ findAllFields(pojoTypeClass, allFields);
+
+ for (int i = 0; i < pojoFieldNames.length; i++) {
+ pojoFields[i] = allFields.get(pojoFieldNames[i]);
+
+ if (pojoFields[i] != null) {
+ pojoFields[i].setAccessible(true);
+ } else {
+ throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
+ }
+ }
+ }
+
+ /**
+ * Finds all declared fields in a class and all its super classes.
+ *
+ * @param clazz Class for which all declared fields are found
+ * @param allFields Map containing all found fields so far
+ */
+ private void findAllFields(Class<?> clazz, Map<String, Field> allFields) {
+ for (Field field : clazz.getDeclaredFields()) {
+ allFields.put(field.getName(), field);
+ }
+
+ if (clazz.getSuperclass() != null) {
+ findAllFields(clazz.getSuperclass(), allFields);
+ }
+ }
+
+ @Override
+ public OUT fillRecord(OUT reuse, Object[] parsedValues) {
+ for (int i = 0; i < parsedValues.length; i++) {
+ try {
+ pojoFields[i].set(reuse, parsedValues[i]);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
+ }
+ }
+ return reuse;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
new file mode 100644
index 0000000..82caddd
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+
+public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private TupleSerializerBase<OUT> tupleSerializer;
+
+ public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
+ }
+
+ public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
+ this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
+ }
+
+ public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
+ }
+
+ public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
+ super(filePath);
+ boolean[] mask = (includedFieldsMask == null)
+ ? createDefaultMask(tupleTypeInfo.getArity())
+ : toBooleanMask(includedFieldsMask);
+ configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
+ }
+
+ public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
+ this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
+ }
+
+ public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
+ super(filePath);
+ configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
+ }
+
+ private void configure(String lineDelimiter, String fieldDelimiter,
+ TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
+
+ if (tupleTypeInfo.getArity() == 0) {
+ throw new IllegalArgumentException("Tuple size must be greater than 0.");
+ }
+
+ if (includedFieldsMask == null) {
+ includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
+ }
+
+ tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
+
+ setDelimiter(lineDelimiter);
+ setFieldDelimiter(fieldDelimiter);
+
+ Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
+
+ for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
+ classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
+ }
+
+ setFieldsGeneric(includedFieldsMask, classes);
+ }
+
+ @Override
+ public OUT fillRecord(OUT reuse, Object[] parsedValues) {
+ return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index a2d37ce..70b9393 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -469,19 +469,12 @@ class TupleGenerator {
// create csv input format
sb.append("\t\tCsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
- sb.append(">> inputFormat = new CsvInputFormat<Tuple" + numFields + "<");
+ sb.append(">> inputFormat = new TupleCsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
- sb.append(">>(path, types);\n");
+ sb.append(">>(path, types, this.includedMask);\n");
// configure input format
- sb.append("\t\tconfigureInputFormat(inputFormat, ");
- for (int i = 0; i < numFields; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append("type" + i);
- }
- sb.append(");\n");
+ sb.append("\t\tconfigureInputFormat(inputFormat);\n");
// return
sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 46e3990..0897063 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -89,6 +89,14 @@ public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
}
@Override
+ public T createOrReuseInstance(Object[] fields, T reuse) {
+ for (int i = 0; i < arity; i++) {
+ reuse.setField(fields[i], i);
+ }
+ return reuse;
+ }
+
+ @Override
public T copy(T from) {
T target = instantiateRaw();
for (int i = 0; i < arity; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index bf3c7a1..fc657a1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -67,6 +67,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
// of immutable Typles (i.e. Scala Tuples)
public abstract T createInstance(Object[] fields);
+ public abstract T createOrReuseInstance(Object[] fields, T reuse);
+
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {