You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/03/25 20:45:17 UTC

[4/5] flink git commit: [FLINK-1512] [java api] Add CsvReader for reading into POJOs

[FLINK-1512] [java api] Add CsvReader for reading into POJOs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b1c19cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b1c19cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b1c19cf

Branch: refs/heads/master
Commit: 7b1c19cfc234b26484ca8746b29f865b38b96147
Parents: 033c69f
Author: Chiwan Park <ch...@icloud.com>
Authored: Thu Feb 19 03:27:59 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Mar 25 20:38:59 2015 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    | 136 +++++++-
 .../flink/api/java/io/CsvInputFormat.java       | 242 +++++---------
 .../org/apache/flink/api/java/io/CsvReader.java |  88 +++--
 .../flink/api/java/tuple/TupleGenerator.java    |   2 +-
 .../flink/api/java/io/CsvInputFormatTest.java   | 330 ++++++++++++++++---
 .../optimizer/ReplicatingDataSourceTest.java    |  36 +-
 .../flink/test/io/CsvReaderWithPOJOITCase.java  | 144 ++++++++
 7 files changed, 738 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 31a2a5a..1803a2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -29,12 +29,21 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
 
 
 public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +59,13 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	// --------------------------------------------------------------------------------------------
 
 	private transient FieldParser<?>[] fieldParsers;
+
+	// To speed up readRecord processing. Used to find windows line endings.
+	// It is set when open so that readRecord does not have to evaluate it
+	protected boolean lineDelimiterIsLinebreak = false;
+
+	protected transient int commentCount;
+	protected transient int invalidLineCount;
 	
 	
 	// --------------------------------------------------------------------------------------------
@@ -58,7 +74,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	
 	private Class<?>[] fieldTypes = EMPTY_TYPES;
 	
-	private boolean[] fieldIncluded = EMPTY_INCLUDED;
+	protected boolean[] fieldIncluded = EMPTY_INCLUDED;
 		
 	private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
 	
@@ -69,8 +85,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	private boolean quotedStringParsing = false;
 
 	private byte quoteCharacter;
-	
-	
+
+	protected byte[] commentPrefix = null;
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Constructors and getters/setters for the configurable parameters
 	// --------------------------------------------------------------------------------------------
@@ -93,6 +111,46 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		return this.fieldTypes.length;
 	}
 
+	public byte[] getCommentPrefix() {
+		return commentPrefix;
+	}
+
+	public void setCommentPrefix(byte[] commentPrefix) {
+		this.commentPrefix = commentPrefix;
+	}
+
+	public void setCommentPrefix(char commentPrefix) {
+		setCommentPrefix(String.valueOf(commentPrefix));
+	}
+
+	public void setCommentPrefix(String commentPrefix) {
+		setCommentPrefix(commentPrefix, Charsets.UTF_8);
+	}
+
+	public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+		if (charsetName == null) {
+			throw new IllegalArgumentException("Charset name must not be null");
+		}
+
+		if (commentPrefix != null) {
+			Charset charset = Charset.forName(charsetName);
+			setCommentPrefix(commentPrefix, charset);
+		} else {
+			this.commentPrefix = null;
+		}
+	}
+
+	public void setCommentPrefix(String commentPrefix, Charset charset) {
+		if (charset == null) {
+			throw new IllegalArgumentException("Charset must not be null");
+		}
+		if (commentPrefix != null) {
+			this.commentPrefix = commentPrefix.getBytes(charset);
+		} else {
+			this.commentPrefix = null;
+		}
+	}
+
 	public byte[] getFieldDelimiter() {
 		return fieldDelim;
 	}
@@ -291,7 +349,23 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 			readLine(); // read and ignore
 		}
 	}
-	
+
+	@Override
+	public void close() throws IOException {
+		if (this.invalidLineCount > 0) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+			}
+		}
+
+		if (this.commentCount > 0) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+			}
+		}
+		super.close();
+	}
+
 	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
 		
 		boolean[] fieldIncluded = this.fieldIncluded;
@@ -400,4 +474,58 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 			}
 		}
 	}
+
+	@SuppressWarnings("unused")
+	protected static void checkAndCoSort(int[] positions, Class<?>[] types) {
+		if (positions.length != types.length) {
+			throw new IllegalArgumentException("The positions and types must be of the same length");
+		}
+
+		TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
+
+		for (int i = 0; i < positions.length; i++) {
+			if (positions[i] < 0) {
+				throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+			}
+			if (types[i] == null) {
+				throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+			}
+
+			if (map.containsKey(positions[i])) {
+				throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
+			}
+
+			map.put(positions[i], types[i]);
+		}
+
+		int i = 0;
+		for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
+			positions[i] = entry.getKey();
+			types[i] = entry.getValue();
+			i++;
+		}
+	}
+
+	protected static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
+		if (positions.length != types.length) {
+			throw new IllegalArgumentException("The positions and types must be of the same length");
+		}
+
+		int lastPos = -1;
+
+		for (int i = 0; i < positions.length; i++) {
+			if (positions[i] < 0) {
+				throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+			}
+			if (types[i] == null) {
+				throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+			}
+
+			if (positions[i] <= lastPos) {
+				throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
+			}
+
+			lastPos = positions[i];
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 7669c39..ee33484 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -19,15 +19,12 @@
 package org.apache.flink.api.java.io;
 
 
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
-
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
@@ -35,11 +32,12 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
 
 
-public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT> {
+public class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,106 +47,90 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 	private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
 	
 	public static final String DEFAULT_LINE_DELIMITER = "\n";
-	
-	public static final String DEFAULT_FIELD_DELIMITER = ",";
 
+	public static final String DEFAULT_FIELD_DELIMITER = ",";
 
 	private transient Object[] parsedValues;
-	
-	private byte[] commentPrefix = null;
-	
-	// To speed up readRecord processing. Used to find windows line endings.
-	// It is set when open so that readRecord does not have to evaluate it
-	private boolean lineDelimiterIsLinebreak = false;
-	
-	private transient int commentCount;
 
-	private transient int invalidLineCount;
-	
-	
-	public CsvInputFormat(Path filePath) {
-		super(filePath);
-	}	
-	
-	public CsvInputFormat(Path filePath, Class<?> ... types) {
-		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types);
-	}	
+	private Class<OUT> pojoTypeClass = null;
+	private String[] pojoFieldsName = null;
+	private transient Field[] pojoFields = null;
+	private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
+
+	public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+	}
 	
-	public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class<?>... types) {
+	public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) {
 		super(filePath);
 
+		Preconditions.checkArgument(typeInformation instanceof CompositeType);
+		CompositeType<OUT> compositeType = (CompositeType<OUT>) typeInformation;
+
 		setDelimiter(lineDelimiter);
 		setFieldDelimiter(fieldDelimiter);
 
-		setFieldTypes(types);
-	}
-	
-	
-	public byte[] getCommentPrefix() {
-		return commentPrefix;
-	}
-	
-	public void setCommentPrefix(byte[] commentPrefix) {
-		this.commentPrefix = commentPrefix;
-	}
-	
-	public void setCommentPrefix(char commentPrefix) {
-		setCommentPrefix(String.valueOf(commentPrefix));
-	}
-	
-	public void setCommentPrefix(String commentPrefix) {
-		setCommentPrefix(commentPrefix, Charsets.UTF_8);
-	}
-	
-	public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
-		if (charsetName == null) {
-			throw new IllegalArgumentException("Charset name must not be null");
+		Class<?>[] classes = new Class<?>[typeInformation.getArity()];
+		for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) {
+			classes[i] = compositeType.getTypeAt(i).getTypeClass();
 		}
-		
-		if (commentPrefix != null) {
-			Charset charset = Charset.forName(charsetName);
-			setCommentPrefix(commentPrefix, charset);
-		} else {
-			this.commentPrefix = null;
+		setFieldTypes(classes);
+
+		if (typeInformation instanceof PojoTypeInfo) {
+			pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation;
+			pojoTypeClass = typeInformation.getTypeClass();
+			pojoFieldsName = compositeType.getFieldNames();
+			setOrderOfPOJOFields(pojoFieldsName);
 		}
 	}
-	
-	public void setCommentPrefix(String commentPrefix, Charset charset) {
-		if (charset == null) {
-			throw new IllegalArgumentException("Charset must not be null");
+
+	public void setOrderOfPOJOFields(String[] fieldsOrder) {
+		Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
+		Preconditions.checkNotNull(fieldsOrder);
+
+		int includedCount = 0;
+		for (boolean isIncluded : fieldIncluded) {
+			if (isIncluded) {
+				includedCount++;
+			}
 		}
-		if (commentPrefix != null) {
-			this.commentPrefix = commentPrefix.getBytes(charset);
-		} else {
-			this.commentPrefix = null;
+
+		Preconditions.checkArgument(includedCount == fieldsOrder.length, includedCount +
+			" CSV fields and " + fieldsOrder.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+		for (String field : fieldsOrder) {
+			Preconditions.checkNotNull(field, "The field name cannot be null.");
+			Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+				"Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
 		}
+
+		pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
 	}
-	
-	
-	public void setFieldTypes(Class<?> ... fieldTypes) {
+
+	public void setFieldTypes(Class<?>... fieldTypes) {
 		if (fieldTypes == null || fieldTypes.length == 0) {
 			throw new IllegalArgumentException("Field types must not be null or empty.");
 		}
-		
+
 		setFieldTypesGeneric(fieldTypes);
 	}
 
 	public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
 		Preconditions.checkNotNull(sourceFieldIndices);
 		Preconditions.checkNotNull(fieldTypes);
-		
+
 		checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-		
+
 		setFieldsGeneric(sourceFieldIndices, fieldTypes);
 	}
-	
+
 	public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
 		Preconditions.checkNotNull(sourceFieldMask);
 		Preconditions.checkNotNull(fieldTypes);
-		
+
 		setFieldsGeneric(sourceFieldMask, fieldTypes);
 	}
-	
+
 	public Class<?>[] getFieldTypes() {
 		return super.getGenericFieldTypes();
 	}
@@ -176,25 +158,22 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
 			this.lineDelimiterIsLinebreak = true;
 		}
-		
-		this.commentCount = 0;
-		this.invalidLineCount = 0;
-	}
-	
-	@Override
-	public void close() throws IOException {
-		if (this.invalidLineCount > 0) {
-			if (LOG.isWarnEnabled()) {
-					LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+
+		// for POJO type
+		if (pojoTypeClass != null) {
+			pojoFields = new Field[pojoFieldsName.length];
+			for (int i = 0; i < pojoFieldsName.length; i++) {
+				try {
+					pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
+					pojoFields[i].setAccessible(true);
+				} catch (NoSuchFieldException e) {
+					throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
+				}
 			}
 		}
 		
-		if (this.commentCount > 0) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
-			}
-		}
-		super.close();
+		this.commentCount = 0;
+		this.invalidLineCount = 0;
 	}
 	
 	@Override
@@ -203,10 +182,10 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 		do {
 			returnRecord = super.nextRecord(record);
 		} while (returnRecord == null && !reachedEnd());
-		
+
 		return returnRecord;
 	}
-
+	
 	@Override
 	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
 		/*
@@ -234,9 +213,21 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 		}
 		
 		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-			// valid parse, map values into pact record
-			for (int i = 0; i < parsedValues.length; i++) {
-				reuse.setField(parsedValues[i], i);
+			if (pojoTypeClass == null) {
+				// result type is tuple
+				Tuple result = (Tuple) reuse;
+				for (int i = 0; i < parsedValues.length; i++) {
+					result.setField(parsedValues[i], i);
+				}
+			} else {
+				// result type is POJO
+				for (int i = 0; i < parsedValues.length; i++) {
+					try {
+						pojoFields[i].set(reuse, parsedValues[i]);
+					} catch (IllegalAccessException e) {
+						throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
+					}
+				}
 			}
 			return reuse;
 		} else {
@@ -251,59 +242,4 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	
-	@SuppressWarnings("unused")
-	private static void checkAndCoSort(int[] positions, Class<?>[] types) {
-		if (positions.length != types.length) {
-			throw new IllegalArgumentException("The positions and types must be of the same length");
-		}
-		
-		TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
-		
-		for (int i = 0; i < positions.length; i++) {
-			if (positions[i] < 0) {
-				throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
-			}
-			if (types[i] == null) {
-				throw new IllegalArgumentException("The type " + i + " is invalid (null)");
-			}
-			
-			if (map.containsKey(positions[i])) {
-				throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
-			}
-			
-			map.put(positions[i], types[i]);
-		}
-		
-		int i = 0;
-		for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
-			positions[i] = entry.getKey();
-			types[i] = entry.getValue();
-			i++;
-		}
-	}
-	
-	private static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
-		if (positions.length != types.length) {
-			throw new IllegalArgumentException("The positions and types must be of the same length");
-		}
-		
-		int lastPos = -1;
-		
-		for (int i = 0; i < positions.length; i++) {
-			if (positions[i] < 0) {
-				throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
-			}
-			if (types[i] == null) {
-				throw new IllegalArgumentException("The type " + i + " is invalid (null)");
-			}
-			
-			if (positions[i] <= lastPos) {
-				throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
-			}
-			
-			lastPos = positions[i];
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ac879b7..11ef629 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.io;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
@@ -275,8 +277,37 @@ public class CsvReader {
 		ignoreInvalidLines = true;
 		return this;
 	}
-	
-	
+
+	/**
+	 * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
+	 * must be public or able to set value. The type information for the fields is obtained from the type class.
+	 *
+	 * @param pojoType The class of the target POJO.
+	 * @param pojoFields The fields of the POJO which are mapped to CSV fields.
+	 * @return The DataSet representing the parsed CSV data.
+	 */
+	public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
+		Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
+		Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
+
+		@SuppressWarnings("unchecked")
+		PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType);
+		CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
+
+		Class<?>[] classes = new Class<?>[pojoFields.length];
+		for (int i = 0; i < pojoFields.length; i++) {
+			int pos = typeInfo.getFieldIndex(pojoFields[i]);
+			if(pos < 0) {
+				throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
+			}
+			classes[i] = typeInfo.getPojoFieldAt(pos).type.getTypeClass();
+		}
+
+		configureInputFormat(inputFormat, classes);
+		inputFormat.setOrderOfPOJOFields(pojoFields);
+
+		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
+	}
 	
 	/**
 	 * Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
@@ -294,7 +325,7 @@ public class CsvReader {
 		
 		@SuppressWarnings("unchecked")
 		TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
-		CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path);
+		CsvInputFormat<T> inputFormat = new CsvInputFormat<T>(path, typeInfo);
 		
 		Class<?>[] classes = new Class<?>[typeInfo.getArity()];
 		for (int i = 0; i < typeInfo.getArity(); i++) {
@@ -318,6 +349,7 @@ public class CsvReader {
 		if (this.parseQuotedStrings) {
 			format.enableQuotedStringParsing(this.quoteCharacter);
 		}
+
 		if (this.includedMask == null) {
 			format.setFieldTypes(types);
 		} else {
@@ -342,7 +374,7 @@ public class CsvReader {
 	 */
 	public <T0> DataSource<Tuple1<T0>> types(Class<T0> type0) {
 		TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0);
-		CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path);
+		CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path, types);
 		configureInputFormat(inputFormat, type0);
 		return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -359,7 +391,7 @@ public class CsvReader {
 	 */
 	public <T0, T1> DataSource<Tuple2<T0, T1>> types(Class<T0> type0, Class<T1> type1) {
 		TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1);
-		CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path);
+		CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path, types);
 		configureInputFormat(inputFormat, type0, type1);
 		return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -377,7 +409,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2> DataSource<Tuple3<T0, T1, T2>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2) {
 		TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2);
-		CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path);
+		CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2);
 		return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -396,7 +428,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3> DataSource<Tuple4<T0, T1, T2, T3>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
 		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3);
-		CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path);
+		CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3);
 		return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -416,7 +448,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4> DataSource<Tuple5<T0, T1, T2, T3, T4>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) {
 		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4);
-		CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path);
+		CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
 		return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -437,7 +469,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5> DataSource<Tuple6<T0, T1, T2, T3, T4, T5>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5) {
 		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5);
-		CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path);
+		CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
 		return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -459,7 +491,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6> DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6) {
 		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
-		CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path);
+		CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
 		return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -482,7 +514,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7> DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7) {
 		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
-		CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path);
+		CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
 		return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -506,7 +538,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8> DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) {
 		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
-		CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path);
+		CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
 		return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -531,7 +563,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
 		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
-		CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path);
+		CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
 		return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -557,7 +589,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10) {
 		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
-		CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path);
+		CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
 		return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -584,7 +616,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11) {
 		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
-		CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path);
+		CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
 		return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -612,7 +644,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12) {
 		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
-		CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path);
+		CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
 		return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -641,7 +673,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) {
 		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
-		CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path);
+		CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
 		return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -671,7 +703,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14) {
 		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
-		CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path);
+		CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
 		return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -702,7 +734,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15) {
 		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
-		CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path);
+		CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
 		return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -734,7 +766,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16) {
 		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
-		CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path);
+		CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
 		return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -767,7 +799,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
 		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
-		CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path);
+		CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
 		return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -801,7 +833,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18) {
 		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
-		CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path);
+		CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
 		return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -836,7 +868,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19) {
 		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
-		CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path);
+		CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
 		return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -872,7 +904,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20) {
 		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
-		CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path);
+		CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
 		return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -909,7 +941,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) {
 		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
-		CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path);
+		CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
 		return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -947,7 +979,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22) {
 		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
-		CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path);
+		CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
 		return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -986,7 +1018,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23) {
 		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
-		CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path);
+		CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
 		return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
@@ -1026,7 +1058,7 @@ public class CsvReader {
 	 */
 	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types(Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4, Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9, Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13, Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17, Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21, Class<T22> type22, Class<T23> type23, Class<T24> type24) {
 		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
-		CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path);
+		CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path, types);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
 		return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index ed429e3..03826fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -470,7 +470,7 @@ class TupleGenerator {
 			appendTupleTypeGenerics(sb, numFields);
 			sb.append(">> inputFormat = new CsvInputFormat<Tuple" + numFields + "<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">>(path);\n");
+			sb.append(">>(path, types);\n");
 
 			// configure input format
 			sb.append("\t\tconfigureInputFormat(inputFormat, ");

http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 6306f6e..bff3fec 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.io;
 
 import com.google.common.base.Charsets;
 
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -53,7 +53,7 @@ public class CsvInputFormatTest {
 	private static final String FIRST_PART = "That is the first part";
 	
 	private static final String SECOND_PART = "That is the second part";
-	
+
 	@Test
 	public void ignoreInvalidLines() {
 		try {
@@ -67,9 +67,9 @@ public class CsvInputFormatTest {
 										"#next|5|6.0|\n";
 			
 			final FileInputSplit split = createTempFile(fileContent);
-			
-			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
-					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|",  String.class, Integer.class, Double.class);
+
+			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setLenient(true);
 		
 			final Configuration parameters = new Configuration();
@@ -115,9 +115,9 @@ public class CsvInputFormatTest {
 									   "#next|5|6.0|\n";
 			
 			final FileInputSplit split = createTempFile(fileContent);
-			
-			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
-					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setCommentPrefix("#");
 		
 			final Configuration parameters = new Configuration();
@@ -159,9 +159,9 @@ public class CsvInputFormatTest {
 									   "//next|5|6.0|\n";
 			
 			final FileInputSplit split = createTempFile(fileContent);
-			
-			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
-					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", String.class, Integer.class, Double.class);
+
+			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setCommentPrefix("//");
 		
 			final Configuration parameters = new Configuration();
@@ -196,9 +196,10 @@ public class CsvInputFormatTest {
 		try {
 			final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
 			final FileInputSplit split = createTempFile(fileContent);
-			
-			final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class);
-		
+
+			final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+			final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
+
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
@@ -239,7 +240,8 @@ public class CsvInputFormatTest {
 			final String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
 			final FileInputSplit split = createTempFile(fileContent);
 
-			final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class);
+			final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+			final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
 
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
@@ -281,12 +283,12 @@ public class CsvInputFormatTest {
 		try {
 			final String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
 			final FileInputSplit split = createTempFile(fileContent);
-		
-			final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH);
+
+			final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+			final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("|-");
-			format.setFieldTypes(String.class, String.class, String.class);
-			
+
 			format.configure(new Configuration());
 			format.open(split);
 
@@ -324,12 +326,13 @@ public class CsvInputFormatTest {
 		try {
 			final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
 			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH);
+
+			final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo =
+					TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
+			final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("|");
-			format.setFieldTypes(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
-			
+
 			format.configure(new Configuration());
 			format.open(split);
 			
@@ -365,12 +368,12 @@ public class CsvInputFormatTest {
 		try {
 			final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
 			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH);
+
+			final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
+			final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("|");
-			format.setFieldTypes(Integer.class, Integer.class);
-			
+
 			format.configure(new Configuration());
 			format.open(split);
 			
@@ -402,8 +405,9 @@ public class CsvInputFormatTest {
 			final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
 					"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
 			final FileInputSplit split = createTempFile(fileContent);	
-			
-			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("|x|");
 			format.setFieldTypes(Integer.class, null, null, Integer.class, null, null, null, Integer.class);
@@ -439,8 +443,9 @@ public class CsvInputFormatTest {
 		try {
 			final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
 			final FileInputSplit split = createTempFile(fileContent);	
-			
-			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("|");
 			
@@ -479,8 +484,9 @@ public class CsvInputFormatTest {
 			final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
 					"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
 			final FileInputSplit split = createTempFile(fileContent);	
-			
-			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+
+			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("&&");
 
@@ -516,7 +522,8 @@ public class CsvInputFormatTest {
 	@Test
 	public void testReadSparseWithShuffledPositions() throws IOException {
 		try {
-			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH);
+			final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
+			final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new CsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo);
 			
 			format.setFieldDelimiter("|");
 			
@@ -570,8 +577,9 @@ public class CsvInputFormatTest {
 
 		final FileInputSplit split = createTempFile(fileContent);
 
-		final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
-				new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);
+		final TupleTypeInfo<Tuple5<Integer, String, String, String, Double>> typeInfo =
+				TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class, String.class, String.class, Double.class);
+		final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo);
 
 		format.setSkipFirstLineAsHeader(true);
 		format.setFieldDelimiter(',');
@@ -651,9 +659,10 @@ public class CsvInputFormatTest {
 			OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
 			wrt.write(fileContent);
 			wrt.close();
-			
-			CsvInputFormat<Tuple1<String>> inputFormat = new CsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()),String.class);
-			
+
+			final TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
+			final CsvInputFormat<Tuple1<String>> inputFormat = new CsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()), typeInfo);
+
 			Configuration parameters = new Configuration(); 
 			inputFormat.configure(parameters);
 			
@@ -684,4 +693,241 @@ public class CsvInputFormatTest {
 		}
 	}
 
+	private void validatePojoItem(CsvInputFormat<PojoItem> format) throws Exception {
+		PojoItem item = new PojoItem();
+
+		format.nextRecord(item);
+
+		assertEquals(123, item.field1);
+		assertEquals("AAA", item.field2);
+		assertEquals(Double.valueOf(3.123), item.field3);
+		assertEquals("BBB", item.field4);
+
+		format.nextRecord(item);
+
+		assertEquals(456, item.field1);
+		assertEquals("BBB", item.field2);
+		assertEquals(Double.valueOf(1.123), item.field3);
+		assertEquals("AAA", item.field4);
+	}
+
+	@Test
+	public void testPojoType() throws Exception {
+		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+		wrt.write("123,AAA,3.123,BBB\n");
+		wrt.write("456,BBB,1.123,AAA\n");
+		wrt.close();
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+		inputFormat.configure(new Configuration());
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+		inputFormat.open(splits[0]);
+
+		validatePojoItem(inputFormat);
+	}
+
+	@Test
+	public void testPojoTypeWithPrivateField() throws Exception {
+		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+		wrt.write("123,AAA,3.123,BBB\n");
+		wrt.write("456,BBB,1.123,AAA\n");
+		wrt.close();
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+		CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+		inputFormat.configure(new Configuration());
+
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+		inputFormat.open(splits[0]);
+
+		PrivatePojoItem item = new PrivatePojoItem();
+		inputFormat.nextRecord(item);
+
+		assertEquals(123, item.field1);
+		assertEquals("AAA", item.field2);
+		assertEquals(Double.valueOf(3.123), item.field3);
+		assertEquals("BBB", item.field4);
+
+		inputFormat.nextRecord(item);
+
+		assertEquals(456, item.field1);
+		assertEquals("BBB", item.field2);
+		assertEquals(Double.valueOf(1.123), item.field3);
+		assertEquals("AAA", item.field4);
+	}
+
+	@Test
+	public void testPojoTypeWithMappingInformation() throws Exception {
+		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+		wrt.write("123,3.123,AAA,BBB\n");
+		wrt.write("456,1.123,BBB,AAA\n");
+		wrt.close();
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+		inputFormat.setFields(new boolean[]{true, true, true, true}, new Class<?>[]{Integer.class, Double.class, String.class, String.class});
+		inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field3", "field2", "field4"});
+
+		inputFormat.configure(new Configuration());
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+		inputFormat.open(splits[0]);
+
+		validatePojoItem(inputFormat);
+	}
+
+	@Test
+	public void testPojoTypeWithPartialFieldInCSV() throws Exception {
+		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+		wrt.write("123,NODATA,AAA,NODATA,3.123,BBB\n");
+		wrt.write("456,NODATA,BBB,NODATA,1.123,AAA\n");
+		wrt.close();
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+		inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String.class, Double.class, String.class});
+
+		inputFormat.configure(new Configuration());
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+		inputFormat.open(splits[0]);
+
+		validatePojoItem(inputFormat);
+	}
+
+	@Test
+	public void testPojoTypeWithMappingInfoAndPartialField() throws Exception {
+		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+		wrt.write("123,3.123,AAA,BBB\n");
+		wrt.write("456,1.123,BBB,AAA\n");
+		wrt.close();
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+		inputFormat.setFields(new boolean[]{true, false, false, true}, new Class[]{Integer.class, String.class});
+		inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field4"});
+
+		inputFormat.configure(new Configuration());
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+		inputFormat.open(splits[0]);
+
+		PojoItem item = new PojoItem();
+		inputFormat.nextRecord(item);
+
+		assertEquals(123, item.field1);
+		assertEquals("BBB", item.field4);
+	}
+
+	@Test
+	public void testPojoTypeWithInvalidFieldMapping() throws Exception {
+		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
+
+		try {
+			inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2"});
+			fail("The number of POJO fields cannot be same as that of selected CSV fields");
+		} catch (IllegalArgumentException e) {
+			// success
+		}
+
+		try {
+			inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", null, "field4"});
+			fail("Fields mapping cannot contain null.");
+		} catch (NullPointerException e) {
+			// success
+		}
+
+		try {
+			inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", "field3", "field5"});
+			fail("Invalid field name");
+		} catch (IllegalArgumentException e) {
+			// success
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Custom types for testing
+	// --------------------------------------------------------------------------------------------
+
+	public static class PojoItem {
+		public int field1;
+		public String field2;
+		public Double field3;
+		public String field4;
+	}
+
+	public static class PrivatePojoItem {
+		private int field1;
+		private String field2;
+		private Double field3;
+		private String field4;
+
+		public int getField1() {
+			return field1;
+		}
+
+		public void setField1(int field1) {
+			this.field1 = field1;
+		}
+
+		public String getField2() {
+			return field2;
+		}
+
+		public void setField2(String field2) {
+			this.field2 = field2;
+		}
+
+		public Double getField3() {
+			return field3;
+		}
+
+		public void setField3(Double field3) {
+			this.field3 = field3;
+		}
+
+		public String getField4() {
+			return field4;
+		}
+
+		public void setField4(String field4) {
+			this.field4 = field4;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index fd451f7..230cc6b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -55,8 +55,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -91,8 +92,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -128,8 +130,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -165,8 +168,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -202,8 +206,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -239,8 +244,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -279,8 +285,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -315,8 +322,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -353,8 +361,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -378,8 +387,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -404,8 +414,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
@@ -429,8 +440,9 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(DEFAULT_PARALLELISM);
 
+		TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
-				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), typeInfo));
 
 		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
 		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);