You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2015/11/18 21:42:45 UTC

[2/2] flink git commit: [FLINK-2692] Untangle CsvInputFormat

[FLINK-2692] Untangle CsvInputFormat

This closes #1266


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd61f2db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd61f2db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd61f2db

Branch: refs/heads/master
Commit: bd61f2dbdf1a0215363ffa8416329e1dbf277593
Parents: fc6fec7
Author: zentol <ch...@apache.org>
Authored: Sun Oct 18 20:23:23 2015 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Nov 18 21:42:00 2015 +0100

----------------------------------------------------------------------
 .../wordcount/BoltTokenizerWordCountPojo.java   |   3 +-
 .../BoltTokenizerWordCountWithNames.java        |   3 +-
 .../flink/api/java/io/CommonCsvInputFormat.java | 258 -------------------
 .../flink/api/java/io/CsvInputFormat.java       | 127 ++++++++-
 .../org/apache/flink/api/java/io/CsvReader.java | 125 ++++-----
 .../flink/api/java/io/PojoCsvInputFormat.java   | 199 ++++++++++++++
 .../flink/api/java/io/TupleCsvInputFormat.java  |  90 +++++++
 .../flink/api/java/tuple/TupleGenerator.java    |  13 +-
 .../java/typeutils/runtime/TupleSerializer.java |   8 +
 .../typeutils/runtime/TupleSerializerBase.java  |   2 +
 .../flink/api/java/io/CsvInputFormatTest.java   |  96 ++-----
 .../flink/python/api/PythonPlanBinder.java      |  13 +-
 .../optimizer/ReplicatingDataSourceTest.java    |  26 +-
 .../scala/operators/ScalaCsvInputFormat.java    |  65 -----
 .../scala/operators/ScalaCsvOutputFormat.java   |   6 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |  56 ++--
 .../scala/typeutils/CaseClassSerializer.scala   |   4 +
 .../flink/api/scala/io/CsvInputFormatTest.scala |  78 ++----
 18 files changed, 573 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
index 9bdcead..e093714 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -20,6 +20,7 @@ package org.apache.flink.storm.wordcount;
 import backtype.storm.topology.IRichBolt;
 
 import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PojoCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -119,7 +120,7 @@ public class BoltTokenizerWordCountPojo {
 			// read the text file from given input path
 			PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor
 					.getForObject(new Sentence(""));
-			return env.createInput(new CsvInputFormat<Sentence>(new Path(
+			return env.createInput(new PojoCsvInputFormat<Sentence>(new Path(
 					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
 					CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
 					sourceType);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
index 019f1bc..f5a1a35 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -21,6 +21,7 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.TupleCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -122,7 +123,7 @@ public class BoltTokenizerWordCountWithNames {
 			// read the text file from given input path
 			TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
 					.getForObject(new Tuple1<String>(""));
-			return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
+			return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path(
 					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
 					CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
 					sourceType);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
deleted file mode 100644
index 444d151..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class CommonCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	public static final String DEFAULT_LINE_DELIMITER = "\n";
-
-	public static final String DEFAULT_FIELD_DELIMITER = ",";
-
-	protected transient Object[] parsedValues;
-
-	private final  Class<OUT> pojoTypeClass;
-
-	private String[] pojoFieldNames;
-
-	private transient PojoTypeInfo<OUT> pojoTypeInfo;
-	private transient Field[] pojoFields;
-
-	public CommonCsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
-		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
-	}
-
-	public CommonCsvInputFormat(
-			Path filePath,
-			String lineDelimiter,
-			String fieldDelimiter,
-			CompositeType<OUT> compositeTypeInfo) {
-		super(filePath);
-
-		setDelimiter(lineDelimiter);
-		setFieldDelimiter(fieldDelimiter);
-
-		Class<?>[] classes = new Class<?>[compositeTypeInfo.getArity()];
-
-		for (int i = 0; i < compositeTypeInfo.getArity(); i++) {
-			classes[i] = compositeTypeInfo.getTypeAt(i).getTypeClass();
-		}
-
-		setFieldTypes(classes);
-
-		if (compositeTypeInfo instanceof PojoTypeInfo) {
-			pojoTypeInfo = (PojoTypeInfo<OUT>) compositeTypeInfo;
-
-			pojoTypeClass = compositeTypeInfo.getTypeClass();
-			setOrderOfPOJOFields(compositeTypeInfo.getFieldNames());
-		} else {
-			pojoTypeClass = null;
-			pojoFieldNames = null;
-		}
-	}
-
-	public void setOrderOfPOJOFields(String[] fieldNames) {
-		Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
-		Preconditions.checkNotNull(fieldNames);
-
-		int includedCount = 0;
-		for (boolean isIncluded : fieldIncluded) {
-			if (isIncluded) {
-				includedCount++;
-			}
-		}
-
-		Preconditions.checkArgument(includedCount == fieldNames.length, includedCount +
-			" CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
-
-		for (String field : fieldNames) {
-			Preconditions.checkNotNull(field, "The field name cannot be null.");
-			Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
-				"Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
-		}
-
-		pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length);
-	}
-
-	public void setFieldTypes(Class<?>... fieldTypes) {
-		if (fieldTypes == null || fieldTypes.length == 0) {
-			throw new IllegalArgumentException("Field types must not be null or empty.");
-		}
-
-		setFieldTypesGeneric(fieldTypes);
-	}
-
-	public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldIndices);
-		Preconditions.checkNotNull(fieldTypes);
-
-		checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
-		setFieldsGeneric(sourceFieldIndices, fieldTypes);
-	}
-
-	public  void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldMask);
-		Preconditions.checkNotNull(fieldTypes);
-
-		setFieldsGeneric(sourceFieldMask, fieldTypes);
-	}
-
-	public Class<?>[] getFieldTypes() {
-		return super.getGenericFieldTypes();
-	}
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-
-		@SuppressWarnings("unchecked")
-		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-
-		//throw exception if no field parsers are available
-		if (fieldParsers.length == 0) {
-			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
-		}
-
-		// create the value holders
-		this.parsedValues = new Object[fieldParsers.length];
-		for (int i = 0; i < fieldParsers.length; i++) {
-			this.parsedValues[i] = fieldParsers[i].createValue();
-		}
-
-		// left to right evaluation makes access [0] okay
-		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
-		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
-			this.lineDelimiterIsLinebreak = true;
-		}
-
-		// for POJO type
-		if (pojoTypeClass != null) {
-			pojoFields = new Field[pojoFieldNames.length];
-
-			Map<String, Field> allFields = new HashMap<String, Field>();
-
-			findAllFields(pojoTypeClass, allFields);
-
-			for (int i = 0; i < pojoFieldNames.length; i++) {
-				pojoFields[i] = allFields.get(pojoFieldNames[i]);
-
-				if (pojoFields[i] != null) {
-					pojoFields[i].setAccessible(true);
-				} else {
-					throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
-				}
-			}
-		}
-
-		this.commentCount = 0;
-		this.invalidLineCount = 0;
-	}
-
-	/**
-	 * Finds all declared fields in a class and all its super classes.
-	 *
-	 * @param clazz Class for which all declared fields are found
-	 * @param allFields Map containing all found fields so far
-	 */
-	private void findAllFields(Class<?> clazz, Map<String, Field> allFields) {
-		for (Field field: clazz.getDeclaredFields()) {
-			allFields.put(field.getName(), field);
-		}
-
-		if (clazz.getSuperclass() != null) {
-			findAllFields(clazz.getSuperclass(), allFields);
-		}
-	}
-
-	@Override
-	public OUT nextRecord(OUT record) throws IOException {
-		OUT returnRecord = null;
-		do {
-			returnRecord = super.nextRecord(record);
-		} while (returnRecord == null && !reachedEnd());
-
-		return returnRecord;
-	}
-
-	@Override
-	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
-		/*
-		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
-		 */
-		//Find windows end line, so find carriage return before the newline
-		if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
-			//reduce the number of bytes so that the Carriage return is not taken as data
-			numBytes--;
-		}
-
-		if (commentPrefix != null && commentPrefix.length <= numBytes) {
-			//check record for comments
-			boolean isComment = true;
-			for (int i = 0; i < commentPrefix.length; i++) {
-				if (commentPrefix[i] != bytes[offset + i]) {
-					isComment = false;
-					break;
-				}
-			}
-			if (isComment) {
-				this.commentCount++;
-				return null;
-			}
-		}
-
-		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-			if (pojoTypeClass == null) {
-				// result type is tuple
-				return createTuple(reuse);
-			} else {
-				// result type is POJO
-				for (int i = 0; i < parsedValues.length; i++) {
-					try {
-						pojoFields[i].set(reuse, parsedValues[i]);
-					} catch (IllegalAccessException e) {
-						throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
-					}
-				}
-				return reuse;
-			}
-
-		} else {
-			this.invalidLineCount++;
-			return null;
-		}
-	}
-
-	protected abstract OUT createTuple(OUT reuse);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 7d86f39..8f0aa64 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -18,32 +18,133 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
+public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
 	private static final long serialVersionUID = 1L;
+
+	public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+	public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+	protected transient Object[] parsedValues;
 	
-	public CsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
-		super(filePath, typeInformation);
+	protected CsvInputFormat(Path filePath) {
+		super(filePath);
 	}
-	
-	public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType<OUT> typeInformation) {
-		super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+
+		@SuppressWarnings("unchecked")
+		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
+
+		//throw exception if no field parsers are available
+		if (fieldParsers.length == 0) {
+			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
+		}
+
+		// create the value holders
+		this.parsedValues = new Object[fieldParsers.length];
+		for (int i = 0; i < fieldParsers.length; i++) {
+			this.parsedValues[i] = fieldParsers[i].createValue();
+		}
+
+		// left to right evaluation makes access [0] okay
+		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
+		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+			this.lineDelimiterIsLinebreak = true;
+		}
+
+		this.commentCount = 0;
+		this.invalidLineCount = 0;
+	}
+
+	@Override
+	public OUT nextRecord(OUT record) throws IOException {
+		OUT returnRecord = null;
+		do {
+			returnRecord = super.nextRecord(record);
+		} while (returnRecord == null && !reachedEnd());
+
+		return returnRecord;
 	}
 
 	@Override
-	protected OUT createTuple(OUT reuse) {
-		Tuple result = (Tuple) reuse;
-		for (int i = 0; i < parsedValues.length; i++) {
-			result.setField(parsedValues[i], i);
+	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+		/*
+		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
+		 */
+		//Find windows end line, so find carriage return before the newline
+		if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
+			//reduce the number of bytes so that the Carriage return is not taken as data
+			numBytes--;
+		}
+
+		if (commentPrefix != null && commentPrefix.length <= numBytes) {
+			//check record for comments
+			boolean isComment = true;
+			for (int i = 0; i < commentPrefix.length; i++) {
+				if (commentPrefix[i] != bytes[offset + i]) {
+					isComment = false;
+					break;
+				}
+			}
+			if (isComment) {
+				this.commentCount++;
+				return null;
+			}
+		}
+
+		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+			return fillRecord(reuse, parsedValues);
+		} else {
+			this.invalidLineCount++;
+			return null;
+		}
+	}
+
+	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
+
+	public Class<?>[] getFieldTypes() {
+		return super.getGenericFieldTypes();
+	}
+
+	protected static boolean[] createDefaultMask(int size) {
+		boolean[] includedMask = new boolean[size];
+		for (int x=0; x<includedMask.length; x++) {
+			includedMask[x] = true;
+		}
+		return includedMask;
+	}
+
+	protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
+		Preconditions.checkNotNull(sourceFieldIndices);
+
+		for (int i : sourceFieldIndices) {
+			if (i < 0) {
+				throw new IllegalArgumentException("Field indices must not be smaller than zero.");
+			}
+		}
+
+		boolean[] includedMask = new boolean[Ints.max(sourceFieldIndices) + 1];
+
+		// check if we support parsers for these types
+		for (int i = 0; i < sourceFieldIndices.length; i++) {
+			includedMask[sourceFieldIndices[i]] = true;
 		}
 
-		return reuse;
+		return includedMask;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 34b5b47..1bb2eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -292,19 +292,10 @@ public class CsvReader {
 
 		@SuppressWarnings("unchecked")
 		PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType);
-		CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
 
-		Class<?>[] classes = new Class<?>[pojoFields.length];
-		for (int i = 0; i < pojoFields.length; i++) {
-			int pos = typeInfo.getFieldIndex(pojoFields[i]);
-			if(pos < 0) {
-				throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
-			}
-			classes[i] = typeInfo.getPojoFieldAt(pos).getTypeInformation().getTypeClass();
-		}
+		CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, this.includedMask);
 
-		configureInputFormat(inputFormat, classes);
-		inputFormat.setOrderOfPOJOFields(pojoFields);
+		configureInputFormat(inputFormat);
 
 		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
@@ -325,14 +316,14 @@ public class CsvReader {
 		
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
-		CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
+		CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask);
 		
 		Class<?>[] classes = new Class<?>[typeInfo.getArity()];
 		for (int i = 0; i < typeInfo.getArity(); i++) {
 			classes[i] = typeInfo.getTypeAt(i).getTypeClass();
 		}
 		
-		configureInputFormat(inputFormat, classes);
+		configureInputFormat(inputFormat);
 		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
 	
@@ -340,7 +331,7 @@ public class CsvReader {
 	// Miscellaneous
 	// --------------------------------------------------------------------------------------------
 	
-	private void configureInputFormat(CsvInputFormat<?> format, Class<?>... types) {
+	private void configureInputFormat(CsvInputFormat<?> format) {
 		format.setDelimiter(this.lineDelimiter);
 		format.setFieldDelimiter(this.fieldDelimiter);
 		format.setCommentPrefix(this.commentPrefix);
@@ -349,12 +340,6 @@ public class CsvReader {
 		if (this.parseQuotedStrings) {
 			format.enableQuotedStringParsing(this.quoteCharacter);
 		}
-
-		if (this.includedMask == null) {
-			format.setFieldTypes(types);
-		} else {
-			format.setFields(this.includedMask, types);
-		}
 	}
 	
 	// --------------------------------------------------------------------------------------------	
@@ -374,8 +359,8 @@ public class CsvReader {
 	 */
 	public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) {
 		TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0);
-		CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path, types);
-		configureInputFormat(inputFormat, type0);
+		CsvInputFormat<Tuple1<T0>> inputFormat = new TupleCsvInputFormat<Tuple1<T0>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -391,8 +376,8 @@ public class CsvReader {
 	 */
 	public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, Class<T1> type1) {
 		TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1);
-		CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path, types);
-		configureInputFormat(inputFormat, type0, type1);
+		CsvInputFormat<Tuple2<T0, T1>> inputFormat = new TupleCsvInputFormat<Tuple2<T0, T1>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -409,8 +394,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2) {
 		TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2);
-		CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2);
+		CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new TupleCsvInputFormat<Tuple3<T0, T1, T2>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -428,8 +413,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
 		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3);
-		CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3);
+		CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new TupleCsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -448,8 +433,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) {
 		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4);
-		CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
+		CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new TupleCsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -469,8 +454,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5) {
 		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5);
-		CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
+		CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new TupleCsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -491,8 +476,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) {
 		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
-		CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
+		CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new TupleCsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -514,8 +499,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7) {
 		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
-		CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
+		CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new TupleCsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -538,8 +523,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) {
 		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
-		CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
+		CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new TupleCsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -563,8 +548,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
 		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
-		CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
+		CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new TupleCsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -589,8 +574,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10) {
 		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
-		CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
+		CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new TupleCsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -616,8 +601,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) {
 		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
-		CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
+		CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new TupleCsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -644,8 +629,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12) {
 		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
-		CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
+		CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new TupleCsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -673,8 +658,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) {
 		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
-		CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
+		CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new TupleCsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -703,8 +688,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14) {
 		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
-		CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
+		CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new TupleCsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -734,8 +719,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) {
 		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
-		CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
+		CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new TupleCsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -766,8 +751,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16) {
 		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
-		CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
+		CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new TupleCsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -799,8 +784,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
 		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
-		CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
+		CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new TupleCsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -833,8 +818,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18) {
 		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
-		CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
+		CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new TupleCsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -868,8 +853,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19) {
 		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
-		CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
+		CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new TupleCsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -904,8 +889,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20) {
 		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
-		CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
+		CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new TupleCsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -941,8 +926,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) {
 		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
-		CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
+		CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new TupleCsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -979,8 +964,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22) {
 		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
-		CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
+		CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new TupleCsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -1018,8 +1003,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23) {
 		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
-		CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
+		CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new TupleCsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
@@ -1058,8 +1043,8 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23, Class<T24> type24) {
 		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
-		CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types);
-		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
+		CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new TupleCsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types, this.includedMask);
+		configureInputFormat(inputFormat);
 		return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
new file mode 100644
index 0000000..2f1139c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	private Class<OUT> pojoTypeClass;
+
+	private String[] pojoFieldNames;
+
+	private transient PojoTypeInfo<OUT> pojoTypeInfo;
+	private transient Field[] pojoFields;
+
+	public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo);
+	}
+
+	public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+	}
+
+	public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo) {
+		this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+	}
+
+	public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames) {
+		this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, createDefaultMask(fieldNames.length));
+	}
+
+	public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+	}
+
+	public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask);
+	}
+
+	public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, int[] includedFieldsMask) {
+		this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+	}
+
+	public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+		super(filePath);
+		boolean[] mask = (includedFieldsMask == null)
+				? createDefaultMask(fieldNames.length)
+				: toBooleanMask(includedFieldsMask);
+		configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, mask);
+	}
+
+	public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+	}
+
+	public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask);
+	}
+
+	public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, boolean[] includedFieldsMask) {
+		this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+	}
+
+	public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+		super(filePath);
+		configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, includedFieldsMask);
+	}
+
+	private void configure(String lineDelimiter, String fieldDelimiter, PojoTypeInfo<OUT> pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+
+		if (includedFieldsMask == null) {
+			includedFieldsMask = createDefaultMask(fieldNames.length);
+		}
+
+		for (String name : fieldNames) {
+			if (name == null) {
+				throw new NullPointerException("Field name must not be null.");
+			}
+			if (pojoTypeInfo.getFieldIndex(name) < 0) {
+				throw new IllegalArgumentException("Field \"" + name + "\" not part of POJO type " + pojoTypeInfo.getTypeClass().getCanonicalName());
+			}
+		}
+
+		setDelimiter(lineDelimiter);
+		setFieldDelimiter(fieldDelimiter);
+
+		Class<?>[] classes = new Class<?>[fieldNames.length];
+
+		for (int i = 0; i < fieldNames.length; i++) {
+			try {
+				classes[i] = pojoTypeInfo.getTypeAt(pojoTypeInfo.getFieldIndex(fieldNames[i])).getTypeClass();
+			} catch (IndexOutOfBoundsException e) {
+				throw new IllegalArgumentException("Invalid field name: " + fieldNames[i]);
+			}
+		}
+
+		this.pojoTypeClass = pojoTypeInfo.getTypeClass();
+		this.pojoTypeInfo = pojoTypeInfo;
+		setFieldsGeneric(includedFieldsMask, classes);
+		setOrderOfPOJOFields(fieldNames);
+	}
+
+	private void setOrderOfPOJOFields(String[] fieldNames) {
+		Preconditions.checkNotNull(fieldNames);
+
+		int includedCount = 0;
+		for (boolean isIncluded : fieldIncluded) {
+			if (isIncluded) {
+				includedCount++;
+			}
+		}
+
+		Preconditions.checkArgument(includedCount == fieldNames.length, includedCount
+				+ " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+		for (String field : fieldNames) {
+			Preconditions.checkNotNull(field, "The field name cannot be null.");
+			Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+					"Field \"" + field + "\" is not a member of POJO class " + pojoTypeClass.getName());
+		}
+
+		pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length);
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+
+		pojoFields = new Field[pojoFieldNames.length];
+
+		Map<String, Field> allFields = new HashMap<String, Field>();
+
+		findAllFields(pojoTypeClass, allFields);
+
+		for (int i = 0; i < pojoFieldNames.length; i++) {
+			pojoFields[i] = allFields.get(pojoFieldNames[i]);
+
+			if (pojoFields[i] != null) {
+				pojoFields[i].setAccessible(true);
+			} else {
+				throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
+			}
+		}
+	}
+
+	/**
+	 * Finds all declared fields in a class and all its super classes.
+	 *
+	 * @param clazz Class for which all declared fields are found
+	 * @param allFields Map containing all found fields so far
+	 */
+	private void findAllFields(Class<?> clazz, Map<String, Field> allFields) {
+		for (Field field : clazz.getDeclaredFields()) {
+			allFields.put(field.getName(), field);
+		}
+
+		if (clazz.getSuperclass() != null) {
+			findAllFields(clazz.getSuperclass(), allFields);
+		}
+	}
+
+	@Override
+	public OUT fillRecord(OUT reuse, Object[] parsedValues) {
+		for (int i = 0; i < parsedValues.length; i++) {
+			try {
+				pojoFields[i].set(reuse, parsedValues[i]);
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
+			}
+		}
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
new file mode 100644
index 0000000..82caddd
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+
+public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	private TupleSerializerBase<OUT> tupleSerializer;
+
+	public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
+	}
+
+	public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
+		this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity()));
+	}
+
+	public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
+	}
+
+	public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) {
+		super(filePath);
+		boolean[] mask = (includedFieldsMask == null)
+				? createDefaultMask(tupleTypeInfo.getArity())
+				: toBooleanMask(includedFieldsMask);
+		configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
+	}
+
+	public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
+	}
+
+	public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
+		super(filePath);
+		configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
+	}
+	
+	private void configure(String lineDelimiter, String fieldDelimiter,
+			TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
+
+		if (tupleTypeInfo.getArity() == 0) {
+			throw new IllegalArgumentException("Tuple size must be greater than 0.");
+		}
+
+		if (includedFieldsMask == null) {
+			includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity());
+		}
+
+		tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig());
+
+		setDelimiter(lineDelimiter);
+		setFieldDelimiter(fieldDelimiter);
+
+		Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
+
+		for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
+			classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
+		}
+
+		setFieldsGeneric(includedFieldsMask, classes);
+	}
+
+	@Override
+	public OUT fillRecord(OUT reuse, Object[] parsedValues) {
+		return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index a2d37ce..70b9393 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -469,19 +469,12 @@ class TupleGenerator {
 			// create csv input format
 			sb.append("\t\tCsvInputFormat<Tuple" + numFields + "<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">> inputFormat = new CsvInputFormat<Tuple" + numFields + "<");
+			sb.append(">> inputFormat = new TupleCsvInputFormat<Tuple" + numFields + "<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">>(path, types);\n");
+			sb.append(">>(path, types, this.includedMask);\n");
 
 			// configure input format
-			sb.append("\t\tconfigureInputFormat(inputFormat, ");
-			for (int i = 0; i < numFields; i++) {
-				if (i > 0) {
-					sb.append(", ");
-				}
-				sb.append("type" + i);
-			}
-			sb.append(");\n");
+			sb.append("\t\tconfigureInputFormat(inputFormat);\n");
 
 			// return
 			sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 46e3990..0897063 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -89,6 +89,14 @@ public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
 	}
 
 	@Override
+	public T createOrReuseInstance(Object[] fields, T reuse) {
+		for (int i = 0; i < arity; i++) {
+			reuse.setField(fields[i], i);
+		}
+		return reuse;
+	}
+
+	@Override
 	public T copy(T from) {
 		T target = instantiateRaw();
 		for (int i = 0; i < arity; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd61f2db/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index bf3c7a1..fc657a1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -67,6 +67,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 	// of immutable Typles (i.e. Scala Tuples)
 	public abstract T createInstance(Object[] fields);
 
+	public abstract T createOrReuseInstance(Object[] fields, T reuse);
+
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		for (int i = 0; i < arity; i++) {