You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/09/28 12:23:08 UTC

flink git commit: [FLINK-2690] [api-breaking] [scala api] [java api] Adds functionality to the CsvInputFormat to find fields defined in a super class of a Pojo. Refactors CsvInputFormat to share code between this format and ScalaCsvInputFormat.

Repository: flink
Updated Branches:
  refs/heads/master 0b3ca57b4 -> 501a9b085


[FLINK-2690] [api-breaking] [scala api] [java api] Adds functionality to the CsvInputFormat to find fields defined in a super class of a Pojo. Refactors CsvInputFormat to share code between this format and ScalaCsvInputFormat.

This closes #1141.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/501a9b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/501a9b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/501a9b08

Branch: refs/heads/master
Commit: 501a9b0854995b3de1c293b3391eeb3048daeef0
Parents: 0b3ca57
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 17 15:28:25 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 28 12:22:27 2015 +0200

----------------------------------------------------------------------
 .../wordcount/BoltTokenizerWordCountPojo.java   |   4 +-
 .../BoltTokenizerWordCountWithNames.java        |   4 +-
 .../api/common/io/GenericCsvInputFormat.java    |   1 -
 .../flink/api/java/io/CommonCsvInputFormat.java | 258 +++++++++++++++++++
 .../flink/api/java/io/CsvInputFormat.java       | 211 +--------------
 .../flink/api/java/io/CsvInputFormatTest.java   | 114 +++++++-
 .../scala/operators/ScalaCsvInputFormat.java    | 204 ++-------------
 .../scala/operators/ScalaCsvOutputFormat.java   |   6 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |  21 +-
 .../api/java/common/PlanBinder.java             |  11 +-
 .../flink/api/scala/io/CsvInputFormatTest.scala |  90 +++++--
 11 files changed, 497 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
index befb18f..20e69db 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
@@ -19,9 +19,9 @@ package org.apache.flink.stormcompatibility.wordcount;
 
 import backtype.storm.topology.IRichBolt;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -121,7 +121,7 @@ public class BoltTokenizerWordCountPojo {
 	private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) {
 		if (fileOutput) {
 			// read the text file from given input path
-			TypeInformation<Sentence> sourceType = TypeExtractor
+			PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo)TypeExtractor
 					.getForObject(new Sentence(""));
 			return env.createInput(new CsvInputFormat<Sentence>(new Path(
 					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
index 8483f48..e233da1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
@@ -20,11 +20,11 @@ package org.apache.flink.stormcompatibility.wordcount;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -124,7 +124,7 @@ public class BoltTokenizerWordCountWithNames {
 	private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
 		if (fileOutput) {
 			// read the text file from given input path
-			TypeInformation<Tuple1<String>> sourceType = TypeExtractor
+			TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
 					.getForObject(new Tuple1<String>(""));
 			return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
 					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 8d979bb..e68d271 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -40,7 +40,6 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
 
-
 public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
new file mode 100644
index 0000000..444d151
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class CommonCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+	public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+	protected transient Object[] parsedValues;
+
+	private final  Class<OUT> pojoTypeClass;
+
+	private String[] pojoFieldNames;
+
+	private transient PojoTypeInfo<OUT> pojoTypeInfo;
+	private transient Field[] pojoFields;
+
+	public CommonCsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
+		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+	}
+
+	public CommonCsvInputFormat(
+			Path filePath,
+			String lineDelimiter,
+			String fieldDelimiter,
+			CompositeType<OUT> compositeTypeInfo) {
+		super(filePath);
+
+		setDelimiter(lineDelimiter);
+		setFieldDelimiter(fieldDelimiter);
+
+		Class<?>[] classes = new Class<?>[compositeTypeInfo.getArity()];
+
+		for (int i = 0; i < compositeTypeInfo.getArity(); i++) {
+			classes[i] = compositeTypeInfo.getTypeAt(i).getTypeClass();
+		}
+
+		setFieldTypes(classes);
+
+		if (compositeTypeInfo instanceof PojoTypeInfo) {
+			pojoTypeInfo = (PojoTypeInfo<OUT>) compositeTypeInfo;
+
+			pojoTypeClass = compositeTypeInfo.getTypeClass();
+			setOrderOfPOJOFields(compositeTypeInfo.getFieldNames());
+		} else {
+			pojoTypeClass = null;
+			pojoFieldNames = null;
+		}
+	}
+
+	public void setOrderOfPOJOFields(String[] fieldNames) {
+		Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
+		Preconditions.checkNotNull(fieldNames);
+
+		int includedCount = 0;
+		for (boolean isIncluded : fieldIncluded) {
+			if (isIncluded) {
+				includedCount++;
+			}
+		}
+
+		Preconditions.checkArgument(includedCount == fieldNames.length, includedCount +
+			" CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
+
+		for (String field : fieldNames) {
+			Preconditions.checkNotNull(field, "The field name cannot be null.");
+			Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
+				"Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
+		}
+
+		pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length);
+	}
+
+	public void setFieldTypes(Class<?>... fieldTypes) {
+		if (fieldTypes == null || fieldTypes.length == 0) {
+			throw new IllegalArgumentException("Field types must not be null or empty.");
+		}
+
+		setFieldTypesGeneric(fieldTypes);
+	}
+
+	public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
+		Preconditions.checkNotNull(sourceFieldIndices);
+		Preconditions.checkNotNull(fieldTypes);
+
+		checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
+
+		setFieldsGeneric(sourceFieldIndices, fieldTypes);
+	}
+
+	public  void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
+		Preconditions.checkNotNull(sourceFieldMask);
+		Preconditions.checkNotNull(fieldTypes);
+
+		setFieldsGeneric(sourceFieldMask, fieldTypes);
+	}
+
+	public Class<?>[] getFieldTypes() {
+		return super.getGenericFieldTypes();
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+
+		@SuppressWarnings("unchecked")
+		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
+
+		//throw exception if no field parsers are available
+		if (fieldParsers.length == 0) {
+			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
+		}
+
+		// create the value holders
+		this.parsedValues = new Object[fieldParsers.length];
+		for (int i = 0; i < fieldParsers.length; i++) {
+			this.parsedValues[i] = fieldParsers[i].createValue();
+		}
+
+		// left to right evaluation makes access [0] okay
+		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
+		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+			this.lineDelimiterIsLinebreak = true;
+		}
+
+		// for POJO type
+		if (pojoTypeClass != null) {
+			pojoFields = new Field[pojoFieldNames.length];
+
+			Map<String, Field> allFields = new HashMap<String, Field>();
+
+			findAllFields(pojoTypeClass, allFields);
+
+			for (int i = 0; i < pojoFieldNames.length; i++) {
+				pojoFields[i] = allFields.get(pojoFieldNames[i]);
+
+				if (pojoFields[i] != null) {
+					pojoFields[i].setAccessible(true);
+				} else {
+					throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
+				}
+			}
+		}
+
+		this.commentCount = 0;
+		this.invalidLineCount = 0;
+	}
+
+	/**
+	 * Finds all declared fields in a class and all its super classes.
+	 *
+	 * @param clazz Class for which all declared fields are found
+	 * @param allFields Map containing all found fields so far
+	 */
+	private void findAllFields(Class<?> clazz, Map<String, Field> allFields) {
+		for (Field field: clazz.getDeclaredFields()) {
+			allFields.put(field.getName(), field);
+		}
+
+		if (clazz.getSuperclass() != null) {
+			findAllFields(clazz.getSuperclass(), allFields);
+		}
+	}
+
+	@Override
+	public OUT nextRecord(OUT record) throws IOException {
+		OUT returnRecord = null;
+		do {
+			returnRecord = super.nextRecord(record);
+		} while (returnRecord == null && !reachedEnd());
+
+		return returnRecord;
+	}
+
+	@Override
+	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+		/*
+		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
+		 */
+		//Find windows end line, so find carriage return before the newline
+		if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
+			//reduce the number of bytes so that the Carriage return is not taken as data
+			numBytes--;
+		}
+
+		if (commentPrefix != null && commentPrefix.length <= numBytes) {
+			//check record for comments
+			boolean isComment = true;
+			for (int i = 0; i < commentPrefix.length; i++) {
+				if (commentPrefix[i] != bytes[offset + i]) {
+					isComment = false;
+					break;
+				}
+			}
+			if (isComment) {
+				this.commentCount++;
+				return null;
+			}
+		}
+
+		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+			if (pojoTypeClass == null) {
+				// result type is tuple
+				return createTuple(reuse);
+			} else {
+				// result type is POJO
+				for (int i = 0; i < parsedValues.length; i++) {
+					try {
+						pojoFields[i].set(reuse, parsedValues[i]);
+					} catch (IllegalAccessException e) {
+						throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
+					}
+				}
+				return reuse;
+			}
+
+		} else {
+			this.invalidLineCount++;
+			return null;
+		}
+	}
+
+	protected abstract OUT createTuple(OUT reuse);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index ee33484..7d86f39 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -19,224 +19,33 @@
 package org.apache.flink.api.java.io;
 
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-
-
-public class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
+public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
 
 	private static final long serialVersionUID = 1L;
 	
-	/**
-	 * The log.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
-	
-	public static final String DEFAULT_LINE_DELIMITER = "\n";
-
-	public static final String DEFAULT_FIELD_DELIMITER = ",";
-
-	private transient Object[] parsedValues;
-
-	private Class<OUT> pojoTypeClass = null;
-	private String[] pojoFieldsName = null;
-	private transient Field[] pojoFields = null;
-	private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
-
-	public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) {
-		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation);
+	public CsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
+		super(filePath, typeInformation);
 	}
 	
-	public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) {
-		super(filePath);
-
-		Preconditions.checkArgument(typeInformation instanceof CompositeType);
-		CompositeType<OUT> compositeType = (CompositeType<OUT>) typeInformation;
-
-		setDelimiter(lineDelimiter);
-		setFieldDelimiter(fieldDelimiter);
-
-		Class<?>[] classes = new Class<?>[typeInformation.getArity()];
-		for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) {
-			classes[i] = compositeType.getTypeAt(i).getTypeClass();
-		}
-		setFieldTypes(classes);
-
-		if (typeInformation instanceof PojoTypeInfo) {
-			pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation;
-			pojoTypeClass = typeInformation.getTypeClass();
-			pojoFieldsName = compositeType.getFieldNames();
-			setOrderOfPOJOFields(pojoFieldsName);
-		}
-	}
-
-	public void setOrderOfPOJOFields(String[] fieldsOrder) {
-		Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
-		Preconditions.checkNotNull(fieldsOrder);
-
-		int includedCount = 0;
-		for (boolean isIncluded : fieldIncluded) {
-			if (isIncluded) {
-				includedCount++;
-			}
-		}
-
-		Preconditions.checkArgument(includedCount == fieldsOrder.length, includedCount +
-			" CSV fields and " + fieldsOrder.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal.");
-
-		for (String field : fieldsOrder) {
-			Preconditions.checkNotNull(field, "The field name cannot be null.");
-			Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
-				"Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName());
-		}
-
-		pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
-	}
-
-	public void setFieldTypes(Class<?>... fieldTypes) {
-		if (fieldTypes == null || fieldTypes.length == 0) {
-			throw new IllegalArgumentException("Field types must not be null or empty.");
-		}
-
-		setFieldTypesGeneric(fieldTypes);
-	}
-
-	public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldIndices);
-		Preconditions.checkNotNull(fieldTypes);
-
-		checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
-		setFieldsGeneric(sourceFieldIndices, fieldTypes);
-	}
-
-	public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldMask);
-		Preconditions.checkNotNull(fieldTypes);
-
-		setFieldsGeneric(sourceFieldMask, fieldTypes);
+	public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType<OUT> typeInformation) {
+		super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
 	}
 
-	public Class<?>[] getFieldTypes() {
-		return super.getGenericFieldTypes();
-	}
-	
 	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		
-		@SuppressWarnings("unchecked")
-		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-		
-		//throw exception if no field parsers are available
-		if (fieldParsers.length == 0) {
-			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
-		}
-		
-		// create the value holders
-		this.parsedValues = new Object[fieldParsers.length];
-		for (int i = 0; i < fieldParsers.length; i++) {
-			this.parsedValues[i] = fieldParsers[i].createValue();
-		}
-		
-		// left to right evaluation makes access [0] okay
-		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
-		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
-			this.lineDelimiterIsLinebreak = true;
+	protected OUT createTuple(OUT reuse) {
+		Tuple result = (Tuple) reuse;
+		for (int i = 0; i < parsedValues.length; i++) {
+			result.setField(parsedValues[i], i);
 		}
 
-		// for POJO type
-		if (pojoTypeClass != null) {
-			pojoFields = new Field[pojoFieldsName.length];
-			for (int i = 0; i < pojoFieldsName.length; i++) {
-				try {
-					pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
-					pojoFields[i].setAccessible(true);
-				} catch (NoSuchFieldException e) {
-					throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
-				}
-			}
-		}
-		
-		this.commentCount = 0;
-		this.invalidLineCount = 0;
+		return reuse;
 	}
-	
-	@Override
-	public OUT nextRecord(OUT record) throws IOException {
-		OUT returnRecord = null;
-		do {
-			returnRecord = super.nextRecord(record);
-		} while (returnRecord == null && !reachedEnd());
 
-		return returnRecord;
-	}
-	
-	@Override
-	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
-		/*
-		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
-		 */
-		//Find windows end line, so find carriage return before the newline 
-		if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
-			//reduce the number of bytes so that the Carriage return is not taken as data
-			numBytes--;
-		}
-		
-		if (commentPrefix != null && commentPrefix.length <= numBytes) {
-			//check record for comments
-			boolean isComment = true;
-			for (int i = 0; i < commentPrefix.length; i++) {
-				if (commentPrefix[i] != bytes[offset + i]) {
-					isComment = false;
-					break;
-				}
-			}
-			if (isComment) {
-				this.commentCount++;
-				return null;
-			}
-		}
-		
-		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-			if (pojoTypeClass == null) {
-				// result type is tuple
-				Tuple result = (Tuple) reuse;
-				for (int i = 0; i < parsedValues.length; i++) {
-					result.setField(parsedValues[i], i);
-				}
-			} else {
-				// result type is POJO
-				for (int i = 0; i < parsedValues.length; i++) {
-					try {
-						pojoFields[i].set(reuse, parsedValues[i]);
-					} catch (IllegalAccessException e) {
-						throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
-					}
-				}
-			}
-			return reuse;
-		} else {
-			this.invalidLineCount++;
-			return null;
-		}
-	}
-	
-	
 	@Override
 	public String toString() {
 		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 4f1c339..2c2ffa5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -22,8 +22,9 @@ package org.apache.flink.api.java.io;
 import com.google.common.base.Charsets;
 
 import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -37,6 +38,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -548,7 +551,7 @@ public class CsvInputFormatTest {
 			
 			format.setFieldDelimiter("|");
 			
-			format.setFields(new int[] {0, 3, 7}, new Class<?>[] {Integer.class, Integer.class, Integer.class});
+			format.setFields(new int[]{0, 3, 7}, new Class<?>[]{Integer.class, Integer.class, Integer.class});
 			
 			
 			format.configure(new Configuration());
@@ -823,7 +826,7 @@ public class CsvInputFormatTest {
 		wrt.close();
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
 		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		inputFormat.configure(new Configuration());
@@ -846,7 +849,7 @@ public class CsvInputFormatTest {
 		wrt.close();
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+		PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
 		CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		inputFormat.configure(new Configuration());
@@ -882,7 +885,7 @@ public class CsvInputFormatTest {
 		wrt.close();
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
 		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
 		inputFormat.setFields(new boolean[]{true, true, true, true}, new Class<?>[]{Integer.class, Double.class, String.class, String.class});
 		inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field3", "field2", "field4"});
@@ -907,7 +910,7 @@ public class CsvInputFormatTest {
 		wrt.close();
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
 		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
 		inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String
 			.class, Double.class, String.class});
@@ -932,7 +935,7 @@ public class CsvInputFormatTest {
 		wrt.close();
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
 		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
 		inputFormat.setFields(new boolean[]{true, false, false, true}, new Class[]{Integer.class, String.class});
 		inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field4"});
@@ -956,7 +959,7 @@ public class CsvInputFormatTest {
 		tempFile.setWritable(true);
 
 		@SuppressWarnings("unchecked")
-		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
+		PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
 		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		try {
@@ -994,7 +997,7 @@ public class CsvInputFormatTest {
 		writer.write(fileContent);
 		writer.close();
 
-		TypeInformation<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+		CompositeType<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
 		CsvInputFormat<Tuple2<String, String>> inputFormat = new CsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		inputFormat.enableQuotedStringParsing('"');
@@ -1025,7 +1028,7 @@ public class CsvInputFormatTest {
 		writer.write(fileContent);
 		writer.close();
 
-		TypeInformation<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+		TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
 		CsvInputFormat<Tuple2<String, String>> inputFormat = new CsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		inputFormat.enableQuotedStringParsing('"');
@@ -1043,6 +1046,49 @@ public class CsvInputFormatTest {
 		assertEquals("We are\\\" young", record.f1);
 	}
 
+	/**
+	 * Tests that the CSV input format can deal with POJOs which are subclasses.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testPojoSubclassType() throws Exception {
+		final String fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2";
+
+		final File tempFile = File.createTempFile("CsvReaderPOJOSubclass", "tmp");
+		tempFile.deleteOnExit();
+
+		OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
+		writer.write(fileContent);
+		writer.close();
+
+		@SuppressWarnings("unchecked")
+		PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>)TypeExtractor.createTypeInfo(TwitterPOJO.class);
+		CsvInputFormat<TwitterPOJO> inputFormat = new CsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
+
+		inputFormat.configure(new Configuration());
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+		inputFormat.open(splits[0]);
+
+		List<TwitterPOJO> expected = new ArrayList<>();
+
+		for (String line: fileContent.split("\n")) {
+			String[] elements = line.split(",");
+			expected.add(new TwitterPOJO(elements[0], elements[1], elements[2]));
+		}
+
+		List<TwitterPOJO> actual = new ArrayList<>();
+
+		TwitterPOJO pojo;
+
+		while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
+			actual.add(pojo);
+		}
+
+		assertEquals(expected, actual);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Custom types for testing
 	// --------------------------------------------------------------------------------------------
@@ -1093,4 +1139,52 @@ public class CsvInputFormatTest {
 		}
 	}
 
+	public static class POJO {
+		public String table;
+		public String time;
+
+		public POJO() {
+			this("", "");
+		}
+
+		public POJO(String table, String time) {
+			this.table = table;
+			this.time = time;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof POJO) {
+				POJO other = (POJO) obj;
+				return table.equals(other.table) && time.equals(other.time);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class TwitterPOJO extends POJO {
+		public String tweet;
+
+		public TwitterPOJO() {
+			this("", "", "");
+		}
+
+		public TwitterPOJO(String table, String time, String tweet) {
+			super(table, time);
+			this.tweet = tweet;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof TwitterPOJO) {
+				TwitterPOJO other = (TwitterPOJO) obj;
+
+				return super.equals(other) && tweet.equals(other.tweet);
+			} else {
+				return false;
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index 9adbed8..522cc0c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -21,213 +21,45 @@ package org.apache.flink.api.scala.operators;
 
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.io.CommonCsvInputFormat;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
 
-import org.apache.flink.types.parser.FieldParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-
-public class ScalaCsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class);
-
-	private transient Object[] parsedValues;
+public class ScalaCsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
+	private static final long serialVersionUID = -7347888812778968640L;
 
 	private final TupleSerializerBase<OUT> tupleSerializer;
 
-	private Class<OUT> pojoTypeClass = null;
-	private String[] pojoFieldsName = null;
-	private transient Field[] pojoFields = null;
-	private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
-
-	public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
-		super(filePath);
+	public ScalaCsvInputFormat(Path filePath, CompositeType<OUT> typeInfo) {
+		super(filePath, typeInfo);
 
-		Class<?>[] classes = new Class[typeInfo.getArity()];
+		Preconditions.checkArgument(typeInfo instanceof PojoTypeInfo || typeInfo instanceof TupleTypeInfoBase,
+			"Only pojo types or tuple types are supported.");
 
 		if (typeInfo instanceof TupleTypeInfoBase) {
-			TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
-			// We can use an empty config here, since we only use the serializer to create
-			// the top-level case class
-			tupleSerializer = (TupleSerializerBase<OUT>) tupleType.createSerializer(new ExecutionConfig());
-
-			for (int i = 0; i < tupleType.getArity(); i++) {
-				classes[i] = tupleType.getTypeAt(i).getTypeClass();
-			}
+			TupleTypeInfoBase<OUT> tupleTypeInfo = (TupleTypeInfoBase<OUT>) typeInfo;
 
-			setFieldTypes(classes);
+			tupleSerializer = (TupleSerializerBase<OUT>)tupleTypeInfo.createSerializer(new ExecutionConfig());
 		} else {
 			tupleSerializer = null;
-			pojoTypeInfo = (PojoTypeInfo<OUT>) typeInfo;
-			pojoTypeClass = typeInfo.getTypeClass();
-			pojoFieldsName = pojoTypeInfo.getFieldNames();
-
-			for (int i = 0, arity = pojoTypeInfo.getArity(); i < arity; i++) {
-				classes[i] = pojoTypeInfo.getTypeAt(i).getTypeClass();
-			}
-
-			setFieldTypes(classes);
-			setOrderOfPOJOFields(pojoFieldsName);
-		}
-	}
-
-	public void setOrderOfPOJOFields(String[] fieldsOrder) {
-		Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO.");
-		Preconditions.checkNotNull(fieldsOrder);
-
-		int includedCount = 0;
-		for (boolean isIncluded : fieldIncluded) {
-			if (isIncluded) {
-				includedCount++;
-			}
-		}
-
-		Preconditions.checkArgument(includedCount == fieldsOrder.length,
-			"The number of selected POJO fields should be the same as that of CSV fields.");
-
-		for (String field : fieldsOrder) {
-			Preconditions.checkNotNull(field, "The field name cannot be null.");
-			Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1,
-				"The given field name isn't matched to POJO fields.");
-		}
-
-		pojoFieldsName = Arrays.copyOfRange(fieldsOrder, 0, fieldsOrder.length);
-	}
-
-	public void setFieldTypes(Class<?>[] fieldTypes) {
-		if (fieldTypes == null || fieldTypes.length == 0) {
-			throw new IllegalArgumentException("Field types must not be null or empty.");
-		}
-
-		setFieldTypesGeneric(fieldTypes);
-	}
-
-	public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldIndices);
-		Preconditions.checkNotNull(fieldTypes);
-
-		checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
-
-		setFieldsGeneric(sourceFieldIndices, fieldTypes);
-	}
-
-	public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) {
-		Preconditions.checkNotNull(sourceFieldMask);
-		Preconditions.checkNotNull(fieldTypes);
-
-		setFieldsGeneric(sourceFieldMask, fieldTypes);
-	}
-
-	public Class<?>[] getFieldTypes() {
-		return super.getGenericFieldTypes();
-	}
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-
-		@SuppressWarnings("unchecked")
-		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
-
-		//throw exception if no field parsers are available
-		if (fieldParsers.length == 0) {
-			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
-		}
-
-		// create the value holders
-		this.parsedValues = new Object[fieldParsers.length];
-		for (int i = 0; i < fieldParsers.length; i++) {
-			this.parsedValues[i] = fieldParsers[i].createValue();
 		}
-
-		// left to right evaluation makes access [0] okay
-		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
-		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
-			this.lineDelimiterIsLinebreak = true;
-		}
-
-		// for POJO type
-		if (pojoTypeClass != null) {
-			pojoFields = new Field[pojoFieldsName.length];
-			for (int i = 0; i < pojoFieldsName.length; i++) {
-				try {
-					pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
-					pojoFields[i].setAccessible(true);
-				} catch (NoSuchFieldException e) {
-					throw new RuntimeException("There is no field called \"" + pojoFieldsName[i] + "\" in " + pojoTypeClass.getName(), e);
-				}
-			}
-		}
-
-		this.commentCount = 0;
-		this.invalidLineCount = 0;
 	}
 
 	@Override
-	public OUT nextRecord(OUT record) throws IOException {
-		OUT returnRecord = null;
-		do {
-			returnRecord = super.nextRecord(record);
-		} while (returnRecord == null && !reachedEnd());
+	protected OUT createTuple(OUT reuse) {
+		Preconditions.checkNotNull(tupleSerializer, "The tuple serializer must be initialised." +
+			" It is not initialized if the given type was not a " +
+			TupleTypeInfoBase.class.getName() + ".");
 
-		return returnRecord;
+		return tupleSerializer.createInstance(parsedValues);
 	}
 
 	@Override
-	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
-		/*
-		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
-		 */
-		//Find windows end line, so find carriage return before the newline 
-		if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
-			//reduce the number of bytes so that the Carriage return is not taken as data
-			numBytes--;
-		}
-
-		if (commentPrefix != null && commentPrefix.length <= numBytes) {
-			//check record for comments
-			boolean isComment = true;
-			for (int i = 0; i < commentPrefix.length; i++) {
-				if (commentPrefix[i] != bytes[offset + i]) {
-					isComment = false;
-					break;
-				}
-			}
-			if (isComment) {
-				this.commentCount++;
-				return null;
-			}
-		}
-
-		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-			if (tupleSerializer != null) {
-				return tupleSerializer.createInstance(parsedValues);
-			} else {
-				for (int i = 0; i < pojoFields.length; i++) {
-					try {
-						pojoFields[i].set(reuse, parsedValues[i]);
-					} catch (IllegalAccessException e) {
-						throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldsName[i] + "\"", e);
-					}
-				}
-
-				return reuse;
-			}
-		} else {
-			this.invalidLineCount++;
-			return null;
-		}
+	public String toString() {
+		return "Scala CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
index afcdc4a..e29a6e6 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.CommonCsvInputFormat;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
@@ -51,9 +51,9 @@ public class ScalaCsvOutputFormat<T extends Product> extends FileOutputFormat<T>
 
 	// --------------------------------------------------------------------------------------------
 
-	public static final String DEFAULT_LINE_DELIMITER = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+	public static final String DEFAULT_LINE_DELIMITER = CommonCsvInputFormat.DEFAULT_LINE_DELIMITER;
 
-	public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+	public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CommonCsvInputFormat.DEFAULT_FIELD_DELIMITER);
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 745b0d3..85c5410 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -20,9 +20,11 @@ package org.apache.flink.api.scala
 import java.util.UUID
 
 import com.esotericsoftware.kryo.Serializer
+import com.google.common.base.Preconditions
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.io._
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -53,11 +55,11 @@ import scala.reflect.ClassTag
  *
  * To get an execution environment use the methods on the companion object:
  *
- *  - [[ExecutionEnvironment.getExecutionEnvironment]]
- *  - [[ExecutionEnvironment.createLocalEnvironment]]
- *  - [[ExecutionEnvironment.createRemoteEnvironment]]
+ *  - [[ExecutionEnvironment#getExecutionEnvironment]]
+ *  - [[ExecutionEnvironment#createLocalEnvironment]]
+ *  - [[ExecutionEnvironment#createRemoteEnvironment]]
  *
- *  Use [[ExecutionEnvironment.getExecutionEnvironment]] to get the correct environment depending
+ *  Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending
  *  on where the program is executed. If it is run inside an IDE a loca environment will be
  *  created. If the program is submitted to a cluster a remote execution environment will
  *  be created.
@@ -294,7 +296,14 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
 
     val typeInfo = implicitly[TypeInformation[T]]
 
-    val inputFormat = new ScalaCsvInputFormat[T](new Path(filePath), typeInfo)
+    Preconditions.checkArgument(
+      typeInfo.isInstanceOf[CompositeType[T]],
+      s"The type $typeInfo has to be a tuple or pojo type.",
+      null)
+
+    val inputFormat = new ScalaCsvInputFormat[T](
+      new Path(filePath),
+      typeInfo.asInstanceOf[CompositeType[T]])
     inputFormat.setDelimiter(lineDelimiter)
     inputFormat.setFieldDelimiter(fieldDelimiter)
     inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
@@ -334,7 +343,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
         " included fields must match.")
       inputFormat.setFields(includedFields, classesBuf.toArray)
     } else {
-      inputFormat.setFieldTypes(classesBuf.toArray)
+      inputFormat.setFieldTypes(classesBuf: _*)
     }
 
     if (pojoFields != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index 69cd501..8ca0405 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -14,6 +14,8 @@ package org.apache.flink.languagebinding.api.java.common;
 
 import java.io.IOException;
 import java.util.HashMap;
+
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.CsvInputFormat;
@@ -256,7 +258,14 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
 	protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException;
 
 	private void createCsvSource(OperationInfo info) throws IOException {
-		sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path), info.lineDelimiter, info.fieldDelimiter, info.types), info.types).name("CsvSource"));
+		if (!(info.types instanceof CompositeType)) {
+			throw new RuntimeException("The output type of a csv source has to be a tuple or a " +
+				"pojo type. The derived type is " + info);
+		}
+
+		sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path),
+			info.lineDelimiter, info.fieldDelimiter, (CompositeType)info.types), info.types)
+			.name("CsvSource"));
 	}
 
 	private void createTextSource(OperationInfo info) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/501a9b08/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 0d74515..e49f737 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -20,13 +20,19 @@ package org.apache.flink.api.scala.io
 import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.{FileInputSplit, Path}
 import org.junit.Assert.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
 import org.junit.Test
 
+import scala.collection.mutable.ArrayBuffer
+
 class CsvInputFormatTest {
 
   private final val PATH: Path = new Path("an/ignored/file/")
@@ -45,7 +51,9 @@ class CsvInputFormatTest {
                         "#next|5|6.0|\n"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(String, Integer, Double)](
-        PATH, createTypeInformation[(String, Integer, Double)])
+        PATH,
+        createTypeInformation[(String, Integer, Double)]
+          .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
       format.setDelimiter("\n")
       format.setFieldDelimiter('|')
       format.setCommentPrefix("#")
@@ -85,7 +93,9 @@ class CsvInputFormatTest {
                         "//next|5|6.0|\n"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(String, Integer, Double)](
-        PATH, createTypeInformation[(String, Integer, Double)])
+        PATH,
+        createTypeInformation[(String, Integer, Double)]
+          .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
       format.setDelimiter("\n")
       format.setFieldDelimiter('|')
       format.setCommentPrefix("//")
@@ -121,7 +131,9 @@ class CsvInputFormatTest {
       val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(String, String, String)](
-        PATH, createTypeInformation[(String, String, String)])
+        PATH,
+        createTypeInformation[(String, String, String)]
+          .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
       format.setDelimiter("\n")
       format.setFieldDelimiter("|")
       val parameters = new Configuration
@@ -161,7 +173,9 @@ class CsvInputFormatTest {
       val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(String, String, String)](
-        PATH, createTypeInformation[(String, String, String)])
+        PATH,
+        createTypeInformation[(String, String, String)]
+          .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
       format.setDelimiter("\n")
       format.enableQuotedStringParsing('"')
       format.setFieldDelimiter("|")
@@ -202,7 +216,9 @@ class CsvInputFormatTest {
       val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(String, String, String)](
-        PATH, createTypeInformation[(String, String, String)])
+        PATH,
+        createTypeInformation[(String, String, String)]
+          .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
       format.setDelimiter("\n")
       format.setFieldDelimiter("|-")
       val parameters = new Configuration
@@ -241,7 +257,8 @@ class CsvInputFormatTest {
       val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)](
-        PATH, createTypeInformation[(Int, Int, Int, Int, Int)])
+        PATH, createTypeInformation[(Int, Int, Int, Int, Int)].
+          asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int, Int)]])
       format.setFieldDelimiter("|")
       format.configure(new Configuration)
       format.open(split)
@@ -276,7 +293,9 @@ class CsvInputFormatTest {
       val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" +
         "666|x|777|x|888|x|999|x|000|x|\n"
       val split = createTempFile(fileContent)
-      val format = new ScalaCsvInputFormat[(Int, Int)](PATH, createTypeInformation[(Int, Int)])
+      val format = new ScalaCsvInputFormat[(Int, Int)](
+        PATH,
+        createTypeInformation[(Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int)]])
       format.setFieldDelimiter("|x|")
       format.configure(new Configuration)
       format.open(split)
@@ -307,7 +326,7 @@ class CsvInputFormatTest {
       val split = createTempFile(fileContent)
       val format = new ScalaCsvInputFormat[(Int, Int, Int)](
         PATH,
-        createTypeInformation[(Int, Int, Int)])
+        createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]])
       format.setFieldDelimiter("|")
       format.setFields(Array(0, 3, 7),
         Array(classOf[Integer], classOf[Integer], classOf[Integer]): Array[Class[_]])
@@ -339,7 +358,7 @@ class CsvInputFormatTest {
     try {
       val format = new ScalaCsvInputFormat[(Int, Int, Int)](
         PATH,
-        createTypeInformation[(Int, Int, Int)])
+        createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]])
       format.setFieldDelimiter("|")
       try {
         format.setFields(Array(8, 1, 3),
@@ -384,7 +403,7 @@ class CsvInputFormatTest {
       wrt.write(fileContent)
       wrt.close()
       val inputFormat = new ScalaCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
-        createTypeInformation[Tuple1[String]])
+        createTypeInformation[Tuple1[String]].asInstanceOf[CaseClassTypeInfo[Tuple1[String]]])
       val parameters = new Configuration
       inputFormat.configure(parameters)
       inputFormat.setDelimiter(lineBreakerSetup)
@@ -442,7 +461,8 @@ class CsvInputFormatTest {
   def testPOJOType(): Unit = {
     val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
     val tempFile = createTempFile(fileContent)
-    val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+    val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
+      .asInstanceOf[PojoTypeInfo[POJOItem]]
     val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
@@ -457,7 +477,8 @@ class CsvInputFormatTest {
   def testCaseClass(): Unit = {
     val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
     val tempFile = createTempFile(fileContent)
-    val typeInfo: TypeInformation[CaseClassItem] = createTypeInformation[CaseClassItem]
+    val typeInfo: CompositeType[CaseClassItem] = createTypeInformation[CaseClassItem]
+      .asInstanceOf[CompositeType[CaseClassItem]]
     val format = new ScalaCsvInputFormat[CaseClassItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
@@ -472,12 +493,13 @@ class CsvInputFormatTest {
   def testPOJOTypeWithFieldMapping(): Unit = {
     val fileContent = "HELLO,123,3.123\n" + "ABC,456,1.234"
     val tempFile = createTempFile(fileContent)
-    val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+    val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
+      .asInstanceOf[PojoTypeInfo[POJOItem]]
     val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
     format.setFieldDelimiter(',')
-    format.setFieldTypes(Array(classOf[String], classOf[Integer], classOf[java.lang.Double]))
+    format.setFieldTypes(classOf[String], classOf[Integer], classOf[java.lang.Double])
     format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
     format.configure(new Configuration)
     format.open(tempFile)
@@ -489,7 +511,8 @@ class CsvInputFormatTest {
   def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
     val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + "ABC,456,NODATA,1.234,NODATA"
     val tempFile = createTempFile(fileContent)
-    val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
+    val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
+      .asInstanceOf[PojoTypeInfo[POJOItem]]
     val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
@@ -502,4 +525,41 @@ class CsvInputFormatTest {
 
     validatePOJOItem(format)
   }
+
+  @Test
+  def testPOJOSubclassType(): Unit = {
+    val fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2"
+    val tempFile = createTempFile(fileContent)
+    val typeInfo: PojoTypeInfo[TwitterPOJO] = createTypeInformation[TwitterPOJO]
+      .asInstanceOf[PojoTypeInfo[TwitterPOJO]]
+    val format = new ScalaCsvInputFormat[TwitterPOJO](PATH, typeInfo)
+
+    format.setDelimiter('\n')
+    format.setFieldDelimiter(',')
+    format.configure(new Configuration)
+    format.open(tempFile)
+
+    val expected = for (line <- fileContent.split("\n")) yield {
+      val elements = line.split(",")
+      new TwitterPOJO(elements(0), elements(1), elements(2))
+    }
+
+    val actual = ArrayBuffer[TwitterPOJO]()
+    var readNextElement = true
+
+    while (readNextElement) {
+      val element = format.nextRecord(new TwitterPOJO())
+
+      if (element != null) {
+        actual += element
+      } else {
+        readNextElement = false
+      }
+    }
+
+    assert(expected.sameElements(actual))
+  }
+
+
+
 }