You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:13 UTC

[44/47] flink git commit: [FLINK-3848] [table] Add projection push down for StreamTableSource.

[FLINK-3848] [table] Add projection push down for StreamTableSource.

- Add plan tests for projection push down.
- Implement ProjectableTableSource in CsvTableSource.
- Refactored RowCsvInputFormat

This closes #2810.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef575e87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef575e87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef575e87

Branch: refs/heads/master
Commit: ef575e87e8a77d56673d40d98d3d66c6511d57ab
Parents: d5c7bf6
Author: tonycox <to...@gmail.com>
Authored: Thu Dec 8 12:00:40 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 16 16:41:21 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/RowCsvInputFormat.java    | 113 +++++++++-----
 .../api/java/io/RowCsvInputFormatTest.java      | 153 +++++++++++-------
 .../flink/table/plan/nodes/FlinkRel.scala       |  26 +++-
 .../table/plan/nodes/dataset/DataSetRel.scala   |  31 +---
 .../plan/nodes/datastream/DataStreamCalc.scala  |   2 +-
 .../datastream/StreamTableSourceScan.scala      |  15 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   3 +-
 ...ushProjectIntoBatchTableSourceScanRule.scala |   4 +-
 ...shProjectIntoStreamTableSourceScanRule.scala |  85 ++++++++++
 .../flink/table/sources/CsvTableSource.scala    |  31 +++-
 .../batch/ProjectableTableSourceITCase.scala    |   2 +-
 .../batch/ProjectableTableSourceTest.scala      | 155 +++++++++++++++++++
 .../api/scala/batch/TableSourceITCase.scala     |  64 ++++----
 .../api/scala/stream/TableSourceITCase.scala    |  76 +++++----
 .../flink/table/utils/CommonTestData.scala      |  63 ++++++++
 15 files changed, 615 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index 34233f5..af2e9e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -20,76 +20,108 @@ package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.parser.FieldParser;
 
+import java.util.Arrays;
+
 @PublicEvolving
-public class RowCsvInputFormat extends CsvInputFormat<Row> {
+public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultTypeQueryable<Row> {
 
 	private static final long serialVersionUID = 1L;
 
 	private int arity;
+	private TypeInformation[] fieldTypeInfos;
+	private int[] fieldPosMap;
 	private boolean emptyColumnAsNull;
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, boolean[] includedFieldsMask, boolean emptyColumnAsNull) {
+	public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypeInfos, String lineDelimiter, String fieldDelimiter, int[] selectedFields, boolean emptyColumnAsNull) {
+
 		super(filePath);
-		if (rowTypeInfo.getArity() == 0) {
-			throw new IllegalArgumentException("Row arity must be greater than 0.");
+		this.arity = fieldTypeInfos.length;
+		if (arity == 0) {
+			throw new IllegalArgumentException("At least one field must be specified");
 		}
-		this.arity = rowTypeInfo.getArity();
-
-		boolean[] fieldsMask;
-		if (includedFieldsMask != null) {
-			fieldsMask = includedFieldsMask;
-		} else {
-			fieldsMask = createDefaultMask(arity);
+		if (arity != selectedFields.length) {
+			throw new IllegalArgumentException("Number of field types and selected fields must be the same");
 		}
+
+		this.fieldTypeInfos = fieldTypeInfos;
+		this.fieldPosMap = toFieldPosMap(selectedFields);
 		this.emptyColumnAsNull = emptyColumnAsNull;
+
+		boolean[] fieldsMask = toFieldMask(selectedFields);
+
 		setDelimiter(lineDelimiter);
 		setFieldDelimiter(fieldDelimiter);
-		setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo));
+		setFieldsGeneric(fieldsMask, extractTypeClasses(fieldTypeInfos));
 	}
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, int[] includedFieldsMask) {
-		this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, toBoolMask(includedFieldsMask), false);
+	public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, String lineDelimiter, String fieldDelimiter, int[] selectedFields) {
+		this(filePath, fieldTypes, lineDelimiter, fieldDelimiter, selectedFields, false);
 	}
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter) {
-		this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, null, false);
+	public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, String lineDelimiter, String fieldDelimiter) {
+		this(filePath, fieldTypes, lineDelimiter, fieldDelimiter, sequentialScanOrder(fieldTypes.length));
 	}
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
-		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask, false);
+	public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, int[] selectedFields) {
+		this(filePath, fieldTypes, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, selectedFields);
 	}
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
-		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask);
+	public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, boolean emptyColumnAsNull) {
+		this(filePath, fieldTypes, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, sequentialScanOrder(fieldTypes.length), emptyColumnAsNull);
 	}
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean emptyColumnAsNull) {
-		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null, emptyColumnAsNull);
+	public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes) {
+		this(filePath, fieldTypes, false);
 	}
 
-	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
-		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null);
+	private static Class<?>[] extractTypeClasses(TypeInformation[] fieldTypes) {
+		Class<?>[] classes = new Class<?>[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			classes[i] = fieldTypes[i].getTypeClass();
+		}
+		return classes;
 	}
 
-	private static Class<?>[] extractTypeClasses(RowTypeInfo rowTypeInfo) {
-		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
-		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
-			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+	private static int[] sequentialScanOrder(int arity) {
+		int[] sequentialOrder = new int[arity];
+		for (int i = 0; i < arity; i++) {
+			sequentialOrder[i] = i;
 		}
-		return classes;
+		return sequentialOrder;
 	}
 
-	private static boolean[] toBoolMask(int[] includedFieldsMask) {
-		if (includedFieldsMask == null) {
-			return null;
-		} else {
-			return toBooleanMask(includedFieldsMask);
+	private static boolean[] toFieldMask(int[] selectedFields) {
+		int maxField = 0;
+		for (int selectedField : selectedFields) {
+			maxField = Math.max(maxField, selectedField);
 		}
+		boolean[] mask = new boolean[maxField + 1];
+		Arrays.fill(mask, false);
+
+		for (int selectedField : selectedFields) {
+			mask[selectedField] = true;
+		}
+		return mask;
+	}
+
+	private static int[] toFieldPosMap(int[] selectedFields) {
+		int[] fieldIdxs = Arrays.copyOf(selectedFields, selectedFields.length);
+		Arrays.sort(fieldIdxs);
+
+		int[] fieldPosMap = new int[selectedFields.length];
+		for (int i = 0; i < selectedFields.length; i++) {
+			int pos = Arrays.binarySearch(fieldIdxs, selectedFields[i]);
+			fieldPosMap[pos] = i;
+		}
+
+		return fieldPosMap;
 	}
 
 	@Override
@@ -129,14 +161,14 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> {
 
 			if (fieldIncluded[field]) {
 				// parse field
-				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
+				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[fieldPosMap[output]];
 				int latestValidPos = startPos;
 				startPos = parser.resetErrorStateAndParse(
 					bytes,
 					startPos,
 					limit,
 					fieldDelimiter,
-					holders[output]);
+					holders[fieldPosMap[output]]);
 
 				if (!isLenient() && (parser.getErrorState() != FieldParser.ParseErrorState.NONE)) {
 					// the error state EMPTY_COLUMN is ignored
@@ -145,14 +177,14 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> {
 							field, new String(bytes, offset, numBytes), parser.getClass().getSimpleName(), parser.getErrorState()));
 					}
 				}
-				holders[output] = parser.getLastResult();
+				holders[fieldPosMap[output]] = parser.getLastResult();
 
 				// check parse result:
 				// the result is null if it is invalid
 				// or empty with emptyColumnAsNull enabled
 				if (startPos < 0 ||
 					(emptyColumnAsNull && (parser.getErrorState().equals(FieldParser.ParseErrorState.EMPTY_COLUMN)))) {
-					holders[output] = null;
+					holders[fieldPosMap[output]] = null;
 					startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter);
 				}
 				output++;
@@ -171,4 +203,9 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> {
 		}
 		return true;
 	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return new RowTypeInfo(this.fieldTypeInfos);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index a68e81e..b819641 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.io;
 import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -68,9 +69,9 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO);
+		TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 		format.setLenient(false);
 		Configuration parameters = new Configuration();
 		format.configure(new Configuration());
@@ -161,12 +162,12 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.DOUBLE_TYPE_INFO);
+			BasicTypeInfo.DOUBLE_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 		format.setCommentPrefix("#");
 		format.configure(new Configuration());
 		format.open(split);
@@ -200,12 +201,12 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.DOUBLE_TYPE_INFO);
+			BasicTypeInfo.DOUBLE_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 		format.setCommentPrefix("//");
 		format.configure(new Configuration());
 		format.open(split);
@@ -233,12 +234,12 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
+			BasicTypeInfo.STRING_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 		format.configure(new Configuration());
 		format.open(split);
 
@@ -273,12 +274,12 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
+			BasicTypeInfo.STRING_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 		format.configure(new Configuration());
 		format.enableQuotedStringParsing('@');
 		format.open(split);
@@ -314,12 +315,12 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
+			BasicTypeInfo.STRING_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 		format.setFieldDelimiter("|-");
 		format.configure(new Configuration());
 		format.open(split);
@@ -355,14 +356,14 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO);
+			BasicTypeInfo.INT_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
 
 		format.setFieldDelimiter("|");
 		format.configure(new Configuration());
@@ -405,7 +406,7 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.BOOLEAN_TYPE_INFO,
 			BasicTypeInfo.BYTE_TYPE_INFO,
 			BasicTypeInfo.DOUBLE_TYPE_INFO,
@@ -413,9 +414,9 @@ public class RowCsvInputFormatTest {
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.LONG_TYPE_INFO,
 			BasicTypeInfo.SHORT_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
+			BasicTypeInfo.STRING_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, true);
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, true);
 		format.setFieldDelimiter(",");
 		format.configure(new Configuration());
 		format.open(split);
@@ -439,14 +440,14 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.DOUBLE_TYPE_INFO,
 			BasicTypeInfo.DOUBLE_TYPE_INFO,
 			BasicTypeInfo.DOUBLE_TYPE_INFO,
 			BasicTypeInfo.DOUBLE_TYPE_INFO,
-			BasicTypeInfo.DOUBLE_TYPE_INFO);
+			BasicTypeInfo.DOUBLE_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
 		format.setFieldDelimiter("|");
 		format.configure(new Configuration());
 		format.open(split);
@@ -480,11 +481,11 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO);
+			BasicTypeInfo.INT_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
 		format.setFieldDelimiter("|");
 		format.configure(new Configuration());
 		format.open(split);
@@ -513,15 +514,15 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO);
+			BasicTypeInfo.INT_TYPE_INFO};
 
 		RowCsvInputFormat format = new RowCsvInputFormat(
 			PATH,
-			typeInfo,
-			new boolean[]{true, false, false, true, false, false, false, true});
+			fieldTypes,
+			new int[]{0,3,7});
 		format.setFieldDelimiter("|x|");
 		format.configure(new Configuration());
 		format.open(split);
@@ -552,14 +553,14 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO);
+			BasicTypeInfo.INT_TYPE_INFO};
 
 		RowCsvInputFormat format = new RowCsvInputFormat(
 			PATH,
-			typeInfo,
+			fieldTypes,
 			new int[]{0, 3, 7});
 		format.setFieldDelimiter("|");
 		format.configure(new Configuration());
@@ -591,15 +592,15 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = RowCsvInputFormatTest.createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO);
+			BasicTypeInfo.INT_TYPE_INFO};
 
 		RowCsvInputFormat format = new RowCsvInputFormat(
 			PATH,
-			typeInfo,
-			new boolean[]{true, false, false, true, false, false, false, true});
+			fieldTypes,
+			new int[]{0, 3, 7});
 		format.setFieldDelimiter("&&");
 		format.configure(new Configuration());
 		format.open(split);
@@ -660,14 +661,14 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.DOUBLE_TYPE_INFO);
+			BasicTypeInfo.DOUBLE_TYPE_INFO};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
 		format.setSkipFirstLineAsHeader(true);
 		format.setFieldDelimiter(",");
 		format.configure(new Configuration());
@@ -749,14 +750,14 @@ public class RowCsvInputFormatTest {
 		writer.write(fileContent);
 		writer.close();
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
+			BasicTypeInfo.STRING_TYPE_INFO};
 
 		RowCsvInputFormat inputFormat = new RowCsvInputFormat(
 			new Path(tempFile.toURI().toString()),
-			typeInfo,
-			new boolean[]{true, false, true});
+			fieldTypes,
+			new int[]{0, 2});
 		inputFormat.enableQuotedStringParsing('"');
 		inputFormat.setFieldDelimiter("|");
 		inputFormat.setDelimiter('\n');
@@ -781,11 +782,11 @@ public class RowCsvInputFormatTest {
 		writer.write(fileContent);
 		writer.close();
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
+			BasicTypeInfo.STRING_TYPE_INFO};
 
-		RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+		RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), fieldTypes);
 		inputFormat.enableQuotedStringParsing('"');
 		inputFormat.setFieldDelimiter("|");
 		inputFormat.setDelimiter('\n');
@@ -806,13 +807,13 @@ public class RowCsvInputFormatTest {
 
 		FileInputSplit split = createTempFile(fileContent);
 
-		RowTypeInfo typeInfo = new RowTypeInfo(
+		TypeInformation[] fieldTypes = new TypeInformation[]{
 			SqlTimeTypeInfo.DATE,
 			SqlTimeTypeInfo.TIME,
 			SqlTimeTypeInfo.TIMESTAMP,
-			SqlTimeTypeInfo.TIMESTAMP);
+			SqlTimeTypeInfo.TIMESTAMP};
 
-		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
 		format.setFieldDelimiter("|");
 		format.configure(new Configuration());
 		format.open(split);
@@ -838,6 +839,48 @@ public class RowCsvInputFormatTest {
 		assertTrue(format.reachedEnd());
 	}
 
+	@Test
+	public void testScanOrder() throws Exception {
+		String fileContent =
+			// first row
+			"111|222|333|444|555|666|777|888|999|000|\n" +
+			// second row
+			"000|999|888|777|666|555|444|333|222|111|";
+		FileInputSplit split = createTempFile(fileContent);
+
+		TypeInformation[] fieldTypes = new TypeInformation[]{
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO};
+
+		int[] order = new int[]{7, 3, 0};
+		RowCsvInputFormat format = new RowCsvInputFormat(
+			PATH,
+			fieldTypes,
+			order);
+
+		format.setFieldDelimiter("|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		// check first row
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(888, result.getField(0));
+		assertEquals(444, result.getField(1));
+		assertEquals(111, result.getField(2));
+
+		// check second row
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(333, result.getField(0));
+		assertEquals(777, result.getField(1));
+		assertEquals(0, result.getField(2));
+
+	}
+
 	private static FileInputSplit createTempFile(String content) throws IOException {
 		File tempFile = File.createTempFile("test_contents", "tmp");
 		tempFile.deleteOnExit();
@@ -859,9 +902,9 @@ public class RowCsvInputFormatTest {
 		wrt.write(fileContent);
 		wrt.close();
 
-		RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO);
+		TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO};
 
-		RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+		RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), fieldTypes);
 		inputFormat.configure(new Configuration());
 		inputFormat.setDelimiter(lineBreakerSetup);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index c8211a2..835f316 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.plan.nodes
 
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.MapRunner
 
@@ -100,4 +102,26 @@ trait FlinkRel {
       genFunction.returnType)
 
   }
+
+  private[flink] def estimateRowSize(rowType: RelDataType): Double = {
+
+    rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
+      t match {
+        case SqlTypeName.TINYINT => s + 1
+        case SqlTypeName.SMALLINT => s + 2
+        case SqlTypeName.INTEGER => s + 4
+        case SqlTypeName.BIGINT => s + 8
+        case SqlTypeName.BOOLEAN => s + 1
+        case SqlTypeName.FLOAT => s + 4
+        case SqlTypeName.DOUBLE => s + 8
+        case SqlTypeName.VARCHAR => s + 12
+        case SqlTypeName.CHAR => s + 1
+        case SqlTypeName.DECIMAL => s + 12
+        case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
+        case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
+        case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
+        case _ => throw TableException(s"Unsupported data type encountered: $t")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
index 210ae03..02138cf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,14 +19,10 @@
 package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.plan.nodes.FlinkRel
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
-import org.apache.flink.table.plan.nodes.FlinkRel
-
-import scala.collection.JavaConversions._
 
 trait DataSetRel extends RelNode with FlinkRel {
 
@@ -45,27 +41,4 @@ trait DataSetRel extends RelNode with FlinkRel {
      tableEnv: BatchTableEnvironment,
      expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
 
-  private[flink] def estimateRowSize(rowType: RelDataType): Double = {
-
-    rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
-      t match {
-        case SqlTypeName.TINYINT => s + 1
-        case SqlTypeName.SMALLINT => s + 2
-        case SqlTypeName.INTEGER => s + 4
-        case SqlTypeName.BIGINT => s + 8
-        case SqlTypeName.BOOLEAN => s + 1
-        case SqlTypeName.FLOAT => s + 4
-        case SqlTypeName.DOUBLE => s + 8
-        case SqlTypeName.VARCHAR => s + 12
-        case SqlTypeName.CHAR => s + 1
-        case SqlTypeName.DECIMAL => s + 12
-        case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
-        case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
-        case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
-        case _ => throw TableException(s"Unsupported data type encountered: $t")
-      }
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 774c17b..43f1fb6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -39,7 +39,7 @@ class DataStreamCalc(
     traitSet: RelTraitSet,
     input: RelNode,
     rowRelDataType: RelDataType,
-    calcProgram: RexProgram,
+    private[flink] val calcProgram: RexProgram,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
   with FlinkCalc

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index f86a54b..702b6eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
@@ -32,7 +33,7 @@ class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    tableSource: StreamTableSource[_])
+    val tableSource: StreamTableSource[_])
   extends StreamScan(cluster, traitSet, table) {
 
   override def deriveRowType() = {
@@ -40,6 +41,11 @@ class StreamTableSourceScan(
     flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val rowCnt = metadata.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
+  }
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new StreamTableSourceScan(
       cluster,
@@ -49,6 +55,11 @@ class StreamTableSourceScan(
     )
   }
 
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("fields", tableSource.getFieldsNames.mkString(", "))
+  }
+
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index bcd12a4..8c8b304 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -155,7 +155,8 @@ object FlinkRuleSets {
       DataStreamUnionRule.INSTANCE,
       DataStreamValuesRule.INSTANCE,
       DataStreamCorrelateRule.INSTANCE,
-      StreamTableSourceScanRule.INSTANCE
+      StreamTableSourceScanRule.INSTANCE,
+      PushProjectIntoStreamTableSourceScanRule.INSTANCE
   )
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 5d91c62..7adec48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -46,7 +46,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
 
     val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
 
-    // if no fields can be projected, there is no need to transform subtree
+    // if no fields can be projected, we keep the original plan.
     if (scan.tableSource.getNumberOfFields != usedFields.length) {
       val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
       val newTableSource = originTableSource.projectFields(usedFields)
@@ -62,8 +62,8 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
         usedFields,
         calc.getCluster.getRexBuilder)
 
-      // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
       if (newCalcProgram.isTrivial) {
+        // drop calc if the transformed program merely returns its input and doesn't exist filter
         call.transformTo(newScan)
       } else {
         val newCalc = new DataSetCalc(

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
new file mode 100644
index 0000000..654fb8f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
+import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
+
+/**
+  * The rule is responsible for push project into a [[StreamTableSourceScan]]
+  */
+class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+    operand(classOf[StreamTableSourceScan], none())),
+  "PushProjectIntoStreamTableSourceScanRule") {
+
+  /** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+    scan.tableSource match {
+      case _: ProjectableTableSource[_] => true
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc = call.rel(0).asInstanceOf[DataStreamCalc]
+    val scan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+
+    val usedFields = extractRefInputFields(calc.calcProgram)
+
+    // if no fields can be projected, we keep the original plan
+    if (scan.tableSource.getNumberOfFields != usedFields.length) {
+      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+      val newTableSource = originTableSource.projectFields(usedFields)
+      val newScan = new StreamTableSourceScan(
+        scan.getCluster,
+        scan.getTraitSet,
+        scan.getTable,
+        newTableSource.asInstanceOf[StreamTableSource[_]])
+
+      val newProgram = rewriteRexProgram(
+        calc.calcProgram,
+        newScan.getRowType,
+        usedFields,
+        calc.getCluster.getRexBuilder)
+
+      if (newProgram.isTrivial) {
+        // drop calc if the transformed program merely returns its input and doesn't exist filter
+        call.transformTo(newScan)
+      } else {
+        val newCalc = new DataStreamCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newProgram,
+          description)
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}
+
+object PushProjectIntoStreamTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushProjectIntoStreamTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index 3f4e395..20e8bb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -54,7 +54,8 @@ class CsvTableSource(
     ignoreComments: String = null,
     lenient: Boolean = false)
   extends BatchTableSource[Row]
-  with StreamTableSource[Row] {
+  with StreamTableSource[Row]
+  with ProjectableTableSource[Row] {
 
   /**
     * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
@@ -74,6 +75,8 @@ class CsvTableSource(
 
   private val returnType = new RowTypeInfo(fieldTypes: _*)
 
+  private var selectedFields: Array[Int] = fieldTypes.indices.toArray
+
   /**
     * Returns the data of the table as a [[DataSet]] of [[Row]].
     *
@@ -106,8 +109,32 @@ class CsvTableSource(
     streamExecEnv.createInput(createCsvInput(), returnType)
   }
 
+  /** Returns a copy of [[TableSource]] with ability to project fields */
+  override def projectFields(fields: Array[Int]): CsvTableSource = {
+
+    val newFieldNames: Array[String] = fields.map(fieldNames(_))
+    val newFieldTypes: Array[TypeInformation[_]] = fields.map(fieldTypes(_))
+
+    val source = new CsvTableSource(path,
+      newFieldNames,
+      newFieldTypes,
+      fieldDelim,
+      rowDelim,
+      quoteCharacter,
+      ignoreFirstLine,
+      ignoreComments,
+      lenient)
+    source.selectedFields = fields
+    source
+  }
+
   private def createCsvInput(): RowCsvInputFormat = {
-    val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim)
+    val inputFormat = new RowCsvInputFormat(
+      new Path(path),
+      fieldTypes,
+      rowDelim,
+      fieldDelim,
+      selectedFields)
 
     inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
     inputFormat.setLenient(lenient)

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
index ec4dc59..37407c8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.api.scala.batch
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
@@ -27,6 +26,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
new file mode 100644
index 0000000..b3097cf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ProjectableTableSourceTest extends TableTestBase {
+
+  private val projectedFields: Array[String] = Array("last", "id", "score")
+  private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  @Test
+  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      sourceBatchTableNode(tableName, projectedFields),
+      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanPlanSQL(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = batchTestUtil()
+
+    util.tEnv.registerTableSource(tableName, csvTable)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      sourceBatchTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = sourceBatchTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .ingest(tableName)
+      .select('last, 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      sourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanSQL(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = streamTestUtil()
+
+    util.tEnv.registerTableSource(tableName, csvTable)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      sourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .ingest(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = sourceStreamTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  def tableSource: (CsvTableSource, String) = {
+    val csvTable = CommonTestData.getCsvTableSource
+    val tableName = "csvTable"
+    (csvTable, tableName)
+  }
+
+  def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
+    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index 0b2c8fc..e324aad 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api.scala.batch
 
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
 import org.apache.flink.api.common.io.GenericInputFormat
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
@@ -29,8 +27,9 @@ import org.apache.flink.types.Row
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
+import org.apache.flink.table.sources.BatchTableSource
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.utils.CommonTestData
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
@@ -83,43 +82,11 @@ class TableSourceITCase(
   @Test
   def testCsvTableSource(): Unit = {
 
-    val csvRecords = Seq(
-      "First#Id#Score#Last",
-      "Mike#1#12.3#Smith",
-      "Bob#2#45.6#Taylor",
-      "Sam#3#7.89#Miller",
-      "Peter#4#0.12#Smith",
-      "% Just a comment",
-      "Liz#5#34.5#Williams",
-      "Sally#6#6.78#Miller",
-      "Alice#7#90.1#Smith",
-      "Kelly#8#2.34#Williams"
-    )
-
-    val tempFile = File.createTempFile("csv-test", "tmp")
-    tempFile.deleteOnExit()
-    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
-    tmpWriter.write(csvRecords.mkString("$"))
-    tmpWriter.close()
+    val csvTable = CommonTestData.getCsvTableSource
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val csvTable = new CsvTableSource(
-      tempFile.getAbsolutePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
-
     tEnv.registerTableSource("csvTable", csvTable)
     val results = tEnv.sql(
       "SELECT last, sum(score), max(id) FROM csvTable GROUP BY last").collect()
@@ -132,6 +99,31 @@ class TableSourceITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testCsvTableSourceWithProjection(): Unit = {
+    val csvTable = CommonTestData.getCsvTableSource
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    tEnv.registerTableSource("csvTable", csvTable)
+
+    val results = tEnv
+      .scan("csvTable")
+      .select('last, 'id.floor(), 'score * 2)
+      .collect()
+
+    val expected = Seq(
+      "Smith,1,24.6",
+      "Taylor,2,91.2",
+      "Miller,3,15.78",
+      "Smith,4,0.24",
+      "Williams,5,69.0",
+      "Miller,6,13.56",
+      "Smith,7,180.2",
+      "Williams,8,4.68").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 class TestBatchTableSource extends BatchTableSource[Row] {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index ce910db..316f2f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.api.scala.stream
 
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{CsvTableSource, StreamTableSource}
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.api.scala._
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -33,6 +31,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.utils.CommonTestData
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -87,46 +86,14 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  def testCsvTableSource(): Unit = {
-
-    val csvRecords = Seq(
-      "First#Id#Score#Last",
-      "Mike#1#12.3#Smith",
-      "Bob#2#45.6#Taylor",
-      "Sam#3#7.89#Miller",
-      "Peter#4#0.12#Smith",
-      "% Just a comment",
-      "Liz#5#34.5#Williams",
-      "Sally#6#6.78#Miller",
-      "Alice#7#90.1#Smith",
-      "Kelly#8#2.34#Williams"
-    )
-
-    val tempFile = File.createTempFile("csv-test", "tmp")
-    tempFile.deleteOnExit()
-    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
-    tmpWriter.write(csvRecords.mkString("$"))
-    tmpWriter.close()
+  def testCsvTableSourceSQL(): Unit = {
+
+    val csvTable = CommonTestData.getCsvTableSource
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val csvTable = new CsvTableSource(
-      tempFile.getAbsolutePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
-
     tEnv.registerTableSource("csvTable", csvTable)
     tEnv.sql(
       "SELECT last, score, id FROM csvTable WHERE id < 4 ")
@@ -141,6 +108,35 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       "Miller,7.89,3")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testCsvTableSourceTableAPI(): Unit = {
+
+    val csvTable = CommonTestData.getCsvTableSource
+    StreamITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    tEnv.registerTableSource("csvTable", csvTable)
+    tEnv.ingest("csvTable")
+      .select('last, 'id.floor(), 'score * 2)
+      .toDataStream[Row]
+      .addSink(new StreamITCase.StringSink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Smith,1,24.6",
+      "Taylor,2,91.2",
+      "Miller,3,15.78",
+      "Smith,4,0.24",
+      "Williams,5,69.0",
+      "Miller,6,13.56",
+      "Smith,7,180.2",
+      "Williams,8,4.68")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
new file mode 100644
index 0000000..349b369
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.sources.CsvTableSource
+
+object CommonTestData {
+
+  def getCsvTableSource: CsvTableSource = {
+    val csvRecords = Seq(
+      "First#Id#Score#Last",
+      "Mike#1#12.3#Smith",
+      "Bob#2#45.6#Taylor",
+      "Sam#3#7.89#Miller",
+      "Peter#4#0.12#Smith",
+      "% Just a comment",
+      "Liz#5#34.5#Williams",
+      "Sally#6#6.78#Miller",
+      "Alice#7#90.1#Smith",
+      "Kelly#8#2.34#Williams"
+    )
+
+    val tempFile = File.createTempFile("csv-test", "tmp")
+    tempFile.deleteOnExit()
+    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
+    tmpWriter.write(csvRecords.mkString("$"))
+    tmpWriter.close()
+
+    new CsvTableSource(
+      tempFile.getAbsolutePath,
+      Array("first", "id", "score", "last"),
+      Array(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.DOUBLE_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO
+      ),
+      fieldDelim = "#",
+      rowDelim = "$",
+      ignoreFirstLine = true,
+      ignoreComments = "%"
+    )
+  }
+}