You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:13 UTC
[44/47] flink git commit: [FLINK-3848] [table] Add projection push
down for StreamTableSource.
[FLINK-3848] [table] Add projection push down for StreamTableSource.
- Add plan tests for projection push down.
- Implement ProjectableTableSource in CsvTableSource.
- Refactored RowCsvInputFormat
This closes #2810.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef575e87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef575e87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef575e87
Branch: refs/heads/master
Commit: ef575e87e8a77d56673d40d98d3d66c6511d57ab
Parents: d5c7bf6
Author: tonycox <to...@gmail.com>
Authored: Thu Dec 8 12:00:40 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 16 16:41:21 2016 +0100
----------------------------------------------------------------------
.../flink/api/java/io/RowCsvInputFormat.java | 113 +++++++++-----
.../api/java/io/RowCsvInputFormatTest.java | 153 +++++++++++-------
.../flink/table/plan/nodes/FlinkRel.scala | 26 +++-
.../table/plan/nodes/dataset/DataSetRel.scala | 31 +---
.../plan/nodes/datastream/DataStreamCalc.scala | 2 +-
.../datastream/StreamTableSourceScan.scala | 15 +-
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +-
...ushProjectIntoBatchTableSourceScanRule.scala | 4 +-
...shProjectIntoStreamTableSourceScanRule.scala | 85 ++++++++++
.../flink/table/sources/CsvTableSource.scala | 31 +++-
.../batch/ProjectableTableSourceITCase.scala | 2 +-
.../batch/ProjectableTableSourceTest.scala | 155 +++++++++++++++++++
.../api/scala/batch/TableSourceITCase.scala | 64 ++++----
.../api/scala/stream/TableSourceITCase.scala | 76 +++++----
.../flink/table/utils/CommonTestData.scala | 63 ++++++++
15 files changed, 615 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index 34233f5..af2e9e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -20,76 +20,108 @@ package org.apache.flink.api.java.io;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;
import org.apache.flink.types.parser.FieldParser;
+import java.util.Arrays;
+
@PublicEvolving
-public class RowCsvInputFormat extends CsvInputFormat<Row> {
+public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultTypeQueryable<Row> {
private static final long serialVersionUID = 1L;
private int arity;
+ private TypeInformation[] fieldTypeInfos;
+ private int[] fieldPosMap;
private boolean emptyColumnAsNull;
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, boolean[] includedFieldsMask, boolean emptyColumnAsNull) {
+ public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypeInfos, String lineDelimiter, String fieldDelimiter, int[] selectedFields, boolean emptyColumnAsNull) {
+
super(filePath);
- if (rowTypeInfo.getArity() == 0) {
- throw new IllegalArgumentException("Row arity must be greater than 0.");
+ this.arity = fieldTypeInfos.length;
+ if (arity == 0) {
+ throw new IllegalArgumentException("At least one field must be specified");
}
- this.arity = rowTypeInfo.getArity();
-
- boolean[] fieldsMask;
- if (includedFieldsMask != null) {
- fieldsMask = includedFieldsMask;
- } else {
- fieldsMask = createDefaultMask(arity);
+ if (arity != selectedFields.length) {
+ throw new IllegalArgumentException("Number of field types and selected fields must be the same");
}
+
+ this.fieldTypeInfos = fieldTypeInfos;
+ this.fieldPosMap = toFieldPosMap(selectedFields);
this.emptyColumnAsNull = emptyColumnAsNull;
+
+ boolean[] fieldsMask = toFieldMask(selectedFields);
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
- setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo));
+ setFieldsGeneric(fieldsMask, extractTypeClasses(fieldTypeInfos));
}
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, int[] includedFieldsMask) {
- this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, toBoolMask(includedFieldsMask), false);
+ public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, String lineDelimiter, String fieldDelimiter, int[] selectedFields) {
+ this(filePath, fieldTypes, lineDelimiter, fieldDelimiter, selectedFields, false);
}
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter) {
- this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, null, false);
+ public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, String lineDelimiter, String fieldDelimiter) {
+ this(filePath, fieldTypes, lineDelimiter, fieldDelimiter, sequentialScanOrder(fieldTypes.length));
}
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
- this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask, false);
+ public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, int[] selectedFields) {
+ this(filePath, fieldTypes, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, selectedFields);
}
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
- this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask);
+ public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes, boolean emptyColumnAsNull) {
+ this(filePath, fieldTypes, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, sequentialScanOrder(fieldTypes.length), emptyColumnAsNull);
}
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean emptyColumnAsNull) {
- this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null, emptyColumnAsNull);
+ public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypes) {
+ this(filePath, fieldTypes, false);
}
- public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
- this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null);
+ private static Class<?>[] extractTypeClasses(TypeInformation[] fieldTypes) {
+ Class<?>[] classes = new Class<?>[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ classes[i] = fieldTypes[i].getTypeClass();
+ }
+ return classes;
}
- private static Class<?>[] extractTypeClasses(RowTypeInfo rowTypeInfo) {
- Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
- for (int i = 0; i < rowTypeInfo.getArity(); i++) {
- classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+ private static int[] sequentialScanOrder(int arity) {
+ int[] sequentialOrder = new int[arity];
+ for (int i = 0; i < arity; i++) {
+ sequentialOrder[i] = i;
}
- return classes;
+ return sequentialOrder;
}
- private static boolean[] toBoolMask(int[] includedFieldsMask) {
- if (includedFieldsMask == null) {
- return null;
- } else {
- return toBooleanMask(includedFieldsMask);
+ private static boolean[] toFieldMask(int[] selectedFields) {
+ int maxField = 0;
+ for (int selectedField : selectedFields) {
+ maxField = Math.max(maxField, selectedField);
}
+ boolean[] mask = new boolean[maxField + 1];
+ Arrays.fill(mask, false);
+
+ for (int selectedField : selectedFields) {
+ mask[selectedField] = true;
+ }
+ return mask;
+ }
+
+ private static int[] toFieldPosMap(int[] selectedFields) {
+ int[] fieldIdxs = Arrays.copyOf(selectedFields, selectedFields.length);
+ Arrays.sort(fieldIdxs);
+
+ int[] fieldPosMap = new int[selectedFields.length];
+ for (int i = 0; i < selectedFields.length; i++) {
+ int pos = Arrays.binarySearch(fieldIdxs, selectedFields[i]);
+ fieldPosMap[pos] = i;
+ }
+
+ return fieldPosMap;
}
@Override
@@ -129,14 +161,14 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> {
if (fieldIncluded[field]) {
// parse field
- FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
+ FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[fieldPosMap[output]];
int latestValidPos = startPos;
startPos = parser.resetErrorStateAndParse(
bytes,
startPos,
limit,
fieldDelimiter,
- holders[output]);
+ holders[fieldPosMap[output]]);
if (!isLenient() && (parser.getErrorState() != FieldParser.ParseErrorState.NONE)) {
// the error state EMPTY_COLUMN is ignored
@@ -145,14 +177,14 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> {
field, new String(bytes, offset, numBytes), parser.getClass().getSimpleName(), parser.getErrorState()));
}
}
- holders[output] = parser.getLastResult();
+ holders[fieldPosMap[output]] = parser.getLastResult();
// check parse result:
// the result is null if it is invalid
// or empty with emptyColumnAsNull enabled
if (startPos < 0 ||
(emptyColumnAsNull && (parser.getErrorState().equals(FieldParser.ParseErrorState.EMPTY_COLUMN)))) {
- holders[output] = null;
+ holders[fieldPosMap[output]] = null;
startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter);
}
output++;
@@ -171,4 +203,9 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> {
}
return true;
}
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return new RowTypeInfo(this.fieldTypeInfos);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index a68e81e..b819641 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
@@ -68,9 +69,9 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO);
+ TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.setLenient(false);
Configuration parameters = new Configuration();
format.configure(new Configuration());
@@ -161,12 +162,12 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO);
+ BasicTypeInfo.DOUBLE_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.setCommentPrefix("#");
format.configure(new Configuration());
format.open(split);
@@ -200,12 +201,12 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO);
+ BasicTypeInfo.DOUBLE_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.setCommentPrefix("//");
format.configure(new Configuration());
format.open(split);
@@ -233,12 +234,12 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
+ BasicTypeInfo.STRING_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.configure(new Configuration());
format.open(split);
@@ -273,12 +274,12 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
+ BasicTypeInfo.STRING_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.configure(new Configuration());
format.enableQuotedStringParsing('@');
format.open(split);
@@ -314,12 +315,12 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
+ BasicTypeInfo.STRING_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.setFieldDelimiter("|-");
format.configure(new Configuration());
format.open(split);
@@ -355,14 +356,14 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO);
+ BasicTypeInfo.INT_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
format.setFieldDelimiter("|");
format.configure(new Configuration());
@@ -405,7 +406,7 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.BYTE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
@@ -413,9 +414,9 @@ public class RowCsvInputFormatTest {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.SHORT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
+ BasicTypeInfo.STRING_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, true);
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, true);
format.setFieldDelimiter(",");
format.configure(new Configuration());
format.open(split);
@@ -439,14 +440,14 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO);
+ BasicTypeInfo.DOUBLE_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
@@ -480,11 +481,11 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO);
+ BasicTypeInfo.INT_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
@@ -513,15 +514,15 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO);
+ BasicTypeInfo.INT_TYPE_INFO};
RowCsvInputFormat format = new RowCsvInputFormat(
PATH,
- typeInfo,
- new boolean[]{true, false, false, true, false, false, false, true});
+ fieldTypes,
+ new int[]{0,3,7});
format.setFieldDelimiter("|x|");
format.configure(new Configuration());
format.open(split);
@@ -552,14 +553,14 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO);
+ BasicTypeInfo.INT_TYPE_INFO};
RowCsvInputFormat format = new RowCsvInputFormat(
PATH,
- typeInfo,
+ fieldTypes,
new int[]{0, 3, 7});
format.setFieldDelimiter("|");
format.configure(new Configuration());
@@ -591,15 +592,15 @@ public class RowCsvInputFormatTest {
FileInputSplit split = RowCsvInputFormatTest.createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO);
+ BasicTypeInfo.INT_TYPE_INFO};
RowCsvInputFormat format = new RowCsvInputFormat(
PATH,
- typeInfo,
- new boolean[]{true, false, false, true, false, false, false, true});
+ fieldTypes,
+ new int[]{0, 3, 7});
format.setFieldDelimiter("&&");
format.configure(new Configuration());
format.open(split);
@@ -660,14 +661,14 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO);
+ BasicTypeInfo.DOUBLE_TYPE_INFO};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
format.setSkipFirstLineAsHeader(true);
format.setFieldDelimiter(",");
format.configure(new Configuration());
@@ -749,14 +750,14 @@ public class RowCsvInputFormatTest {
writer.write(fileContent);
writer.close();
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
+ BasicTypeInfo.STRING_TYPE_INFO};
RowCsvInputFormat inputFormat = new RowCsvInputFormat(
new Path(tempFile.toURI().toString()),
- typeInfo,
- new boolean[]{true, false, true});
+ fieldTypes,
+ new int[]{0, 2});
inputFormat.enableQuotedStringParsing('"');
inputFormat.setFieldDelimiter("|");
inputFormat.setDelimiter('\n');
@@ -781,11 +782,11 @@ public class RowCsvInputFormatTest {
writer.write(fileContent);
writer.close();
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO);
+ BasicTypeInfo.STRING_TYPE_INFO};
- RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+ RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), fieldTypes);
inputFormat.enableQuotedStringParsing('"');
inputFormat.setFieldDelimiter("|");
inputFormat.setDelimiter('\n');
@@ -806,13 +807,13 @@ public class RowCsvInputFormatTest {
FileInputSplit split = createTempFile(fileContent);
- RowTypeInfo typeInfo = new RowTypeInfo(
+ TypeInformation[] fieldTypes = new TypeInformation[]{
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.TIME,
SqlTimeTypeInfo.TIMESTAMP,
- SqlTimeTypeInfo.TIMESTAMP);
+ SqlTimeTypeInfo.TIMESTAMP};
- RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
@@ -838,6 +839,48 @@ public class RowCsvInputFormatTest {
assertTrue(format.reachedEnd());
}
+ @Test
+ public void testScanOrder() throws Exception {
+ String fileContent =
+ // first row
+ "111|222|333|444|555|666|777|888|999|000|\n" +
+ // second row
+ "000|999|888|777|666|555|444|333|222|111|";
+ FileInputSplit split = createTempFile(fileContent);
+
+ TypeInformation[] fieldTypes = new TypeInformation[]{
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO};
+
+ int[] order = new int[]{7, 3, 0};
+ RowCsvInputFormat format = new RowCsvInputFormat(
+ PATH,
+ fieldTypes,
+ order);
+
+ format.setFieldDelimiter("|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ // check first row
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(888, result.getField(0));
+ assertEquals(444, result.getField(1));
+ assertEquals(111, result.getField(2));
+
+ // check second row
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(333, result.getField(0));
+ assertEquals(777, result.getField(1));
+ assertEquals(0, result.getField(2));
+
+ }
+
private static FileInputSplit createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
@@ -859,9 +902,9 @@ public class RowCsvInputFormatTest {
wrt.write(fileContent);
wrt.close();
- RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO);
+ TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO};
- RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+ RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), fieldTypes);
inputFormat.configure(new Configuration());
inputFormat.setDelimiter(lineBreakerSetup);
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index c8211a2..835f316 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -18,10 +18,12 @@
package org.apache.flink.table.plan.nodes
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.MapRunner
@@ -100,4 +102,26 @@ trait FlinkRel {
genFunction.returnType)
}
+
+ private[flink] def estimateRowSize(rowType: RelDataType): Double = {
+
+ rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
+ t match {
+ case SqlTypeName.TINYINT => s + 1
+ case SqlTypeName.SMALLINT => s + 2
+ case SqlTypeName.INTEGER => s + 4
+ case SqlTypeName.BIGINT => s + 8
+ case SqlTypeName.BOOLEAN => s + 1
+ case SqlTypeName.FLOAT => s + 4
+ case SqlTypeName.DOUBLE => s + 8
+ case SqlTypeName.VARCHAR => s + 12
+ case SqlTypeName.CHAR => s + 1
+ case SqlTypeName.DECIMAL => s + 12
+ case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
+ case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
+ case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
+ case _ => throw TableException(s"Unsupported data type encountered: $t")
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
index 210ae03..02138cf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,14 +19,10 @@
package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.plan.nodes.FlinkRel
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
-import org.apache.flink.table.plan.nodes.FlinkRel
-
-import scala.collection.JavaConversions._
trait DataSetRel extends RelNode with FlinkRel {
@@ -45,27 +41,4 @@ trait DataSetRel extends RelNode with FlinkRel {
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
- private[flink] def estimateRowSize(rowType: RelDataType): Double = {
-
- rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
- t match {
- case SqlTypeName.TINYINT => s + 1
- case SqlTypeName.SMALLINT => s + 2
- case SqlTypeName.INTEGER => s + 4
- case SqlTypeName.BIGINT => s + 8
- case SqlTypeName.BOOLEAN => s + 1
- case SqlTypeName.FLOAT => s + 4
- case SqlTypeName.DOUBLE => s + 8
- case SqlTypeName.VARCHAR => s + 12
- case SqlTypeName.CHAR => s + 1
- case SqlTypeName.DECIMAL => s + 12
- case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
- case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
- case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
- case _ => throw TableException(s"Unsupported data type encountered: $t")
- }
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 774c17b..43f1fb6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -39,7 +39,7 @@ class DataStreamCalc(
traitSet: RelTraitSet,
input: RelNode,
rowRelDataType: RelDataType,
- calcProgram: RexProgram,
+ private[flink] val calcProgram: RexProgram,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
with FlinkCalc
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index f86a54b..702b6eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -19,7 +19,8 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.TableSourceTable
@@ -32,7 +33,7 @@ class StreamTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- tableSource: StreamTableSource[_])
+ val tableSource: StreamTableSource[_])
extends StreamScan(cluster, traitSet, table) {
override def deriveRowType() = {
@@ -40,6 +41,11 @@ class StreamTableSourceScan(
flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
}
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val rowCnt = metadata.getRowCount(this)
+ planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
+ }
+
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new StreamTableSourceScan(
cluster,
@@ -49,6 +55,11 @@ class StreamTableSourceScan(
)
}
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("fields", tableSource.getFieldsNames.mkString(", "))
+ }
+
override def translateToPlan(
tableEnv: StreamTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index bcd12a4..8c8b304 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -155,7 +155,8 @@ object FlinkRuleSets {
DataStreamUnionRule.INSTANCE,
DataStreamValuesRule.INSTANCE,
DataStreamCorrelateRule.INSTANCE,
- StreamTableSourceScanRule.INSTANCE
+ StreamTableSourceScanRule.INSTANCE,
+ PushProjectIntoStreamTableSourceScanRule.INSTANCE
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 5d91c62..7adec48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -46,7 +46,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
- // if no fields can be projected, there is no need to transform subtree
+ // if no fields can be projected, we keep the original plan.
if (scan.tableSource.getNumberOfFields != usedFields.length) {
val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
val newTableSource = originTableSource.projectFields(usedFields)
@@ -62,8 +62,8 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
usedFields,
calc.getCluster.getRexBuilder)
- // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
if (newCalcProgram.isTrivial) {
+ // drop calc if the transformed program merely returns its input and doesn't exist filter
call.transformTo(newScan)
} else {
val newCalc = new DataSetCalc(
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
new file mode 100644
index 0000000..654fb8f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
+import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
+
+/**
+ * The rule is responsible for push project into a [[StreamTableSourceScan]]
+ */
+class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
+ operand(classOf[DataStreamCalc],
+ operand(classOf[StreamTableSourceScan], none())),
+ "PushProjectIntoStreamTableSourceScanRule") {
+
+ /** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+ scan.tableSource match {
+ case _: ProjectableTableSource[_] => true
+ case _ => false
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val calc = call.rel(0).asInstanceOf[DataStreamCalc]
+ val scan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+
+ val usedFields = extractRefInputFields(calc.calcProgram)
+
+ // if no fields can be projected, we keep the original plan
+ if (scan.tableSource.getNumberOfFields != usedFields.length) {
+ val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+ val newTableSource = originTableSource.projectFields(usedFields)
+ val newScan = new StreamTableSourceScan(
+ scan.getCluster,
+ scan.getTraitSet,
+ scan.getTable,
+ newTableSource.asInstanceOf[StreamTableSource[_]])
+
+ val newProgram = rewriteRexProgram(
+ calc.calcProgram,
+ newScan.getRowType,
+ usedFields,
+ calc.getCluster.getRexBuilder)
+
+ if (newProgram.isTrivial) {
+ // drop calc if the transformed program merely returns its input and doesn't exist filter
+ call.transformTo(newScan)
+ } else {
+ val newCalc = new DataStreamCalc(
+ calc.getCluster,
+ calc.getTraitSet,
+ newScan,
+ calc.getRowType,
+ newProgram,
+ description)
+ call.transformTo(newCalc)
+ }
+ }
+ }
+}
+
+object PushProjectIntoStreamTableSourceScanRule {
+ val INSTANCE: RelOptRule = new PushProjectIntoStreamTableSourceScanRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index 3f4e395..20e8bb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -54,7 +54,8 @@ class CsvTableSource(
ignoreComments: String = null,
lenient: Boolean = false)
extends BatchTableSource[Row]
- with StreamTableSource[Row] {
+ with StreamTableSource[Row]
+ with ProjectableTableSource[Row] {
/**
* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
@@ -74,6 +75,8 @@ class CsvTableSource(
private val returnType = new RowTypeInfo(fieldTypes: _*)
+ private var selectedFields: Array[Int] = fieldTypes.indices.toArray
+
/**
* Returns the data of the table as a [[DataSet]] of [[Row]].
*
@@ -106,8 +109,32 @@ class CsvTableSource(
streamExecEnv.createInput(createCsvInput(), returnType)
}
+ /** Returns a copy of [[TableSource]] with ability to project fields */
+ override def projectFields(fields: Array[Int]): CsvTableSource = {
+
+ val newFieldNames: Array[String] = fields.map(fieldNames(_))
+ val newFieldTypes: Array[TypeInformation[_]] = fields.map(fieldTypes(_))
+
+ val source = new CsvTableSource(path,
+ newFieldNames,
+ newFieldTypes,
+ fieldDelim,
+ rowDelim,
+ quoteCharacter,
+ ignoreFirstLine,
+ ignoreComments,
+ lenient)
+ source.selectedFields = fields
+ source
+ }
+
private def createCsvInput(): RowCsvInputFormat = {
- val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim)
+ val inputFormat = new RowCsvInputFormat(
+ new Path(path),
+ fieldTypes,
+ rowDelim,
+ fieldDelim,
+ selectedFields)
inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
inputFormat.setLenient(lenient)
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
index ec4dc59..37407c8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.api.scala.batch
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
@@ -27,6 +26,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
new file mode 100644
index 0000000..b3097cf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ProjectableTableSourceTest extends TableTestBase {
+
+ private val projectedFields: Array[String] = Array("last", "id", "score")
+ private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+ @Test
+ def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .scan(tableName)
+ .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(tableName, projectedFields),
+ term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testBatchProjectableSourceScanPlanSQL(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = batchTestUtil()
+
+ util.tEnv.registerTableSource(tableName, csvTable)
+
+ val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(tableName, projectedFields),
+ term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .scan(tableName)
+ .select('id, 'score, 'first)
+
+ val expected = sourceBatchTableNode(tableName, noCalcFields)
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .ingest(tableName)
+ .select('last, 'id.floor(), 'score * 2)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(tableName, projectedFields),
+ term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamProjectableSourceScanPlanSQL(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = streamTestUtil()
+
+ util.tEnv.registerTableSource(tableName, csvTable)
+
+ val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(tableName, projectedFields),
+ term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .ingest(tableName)
+ .select('id, 'score, 'first)
+
+ val expected = sourceStreamTableNode(tableName, noCalcFields)
+ util.verifyTable(result, expected)
+ }
+
+ def tableSource: (CsvTableSource, String) = {
+ val csvTable = CommonTestData.getCsvTableSource
+ val tableName = "csvTable"
+ (csvTable, tableName)
+ }
+
+ def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
+ s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+ }
+
+ def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
+ s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index 0b2c8fc..e324aad 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -18,8 +18,6 @@
package org.apache.flink.table.api.scala.batch
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
import org.apache.flink.api.common.io.GenericInputFormat
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
@@ -29,8 +27,9 @@ import org.apache.flink.types.Row
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
+import org.apache.flink.table.sources.BatchTableSource
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.utils.CommonTestData
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Test
@@ -83,43 +82,11 @@ class TableSourceITCase(
@Test
def testCsvTableSource(): Unit = {
- val csvRecords = Seq(
- "First#Id#Score#Last",
- "Mike#1#12.3#Smith",
- "Bob#2#45.6#Taylor",
- "Sam#3#7.89#Miller",
- "Peter#4#0.12#Smith",
- "% Just a comment",
- "Liz#5#34.5#Williams",
- "Sally#6#6.78#Miller",
- "Alice#7#90.1#Smith",
- "Kelly#8#2.34#Williams"
- )
-
- val tempFile = File.createTempFile("csv-test", "tmp")
- tempFile.deleteOnExit()
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
- tmpWriter.write(csvRecords.mkString("$"))
- tmpWriter.close()
+ val csvTable = CommonTestData.getCsvTableSource
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val csvTable = new CsvTableSource(
- tempFile.getAbsolutePath,
- Array("first", "id", "score", "last"),
- Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
-
tEnv.registerTableSource("csvTable", csvTable)
val results = tEnv.sql(
"SELECT last, sum(score), max(id) FROM csvTable GROUP BY last").collect()
@@ -132,6 +99,31 @@ class TableSourceITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ @Test
+ def testCsvTableSourceWithProjection(): Unit = {
+ val csvTable = CommonTestData.getCsvTableSource
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerTableSource("csvTable", csvTable)
+
+ val results = tEnv
+ .scan("csvTable")
+ .select('last, 'id.floor(), 'score * 2)
+ .collect()
+
+ val expected = Seq(
+ "Smith,1,24.6",
+ "Taylor,2,91.2",
+ "Miller,3,15.78",
+ "Smith,4,0.24",
+ "Williams,5,69.0",
+ "Miller,6,13.56",
+ "Smith,7,180.2",
+ "Williams,8,4.68").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
}
class TestBatchTableSource extends BatchTableSource[Row] {
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index ce910db..316f2f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -18,14 +18,12 @@
package org.apache.flink.table.api.scala.stream
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.stream.utils.StreamITCase
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{CsvTableSource, StreamTableSource}
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.api.scala._
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment
import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -33,6 +31,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.utils.CommonTestData
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
@@ -87,46 +86,14 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
}
@Test
- def testCsvTableSource(): Unit = {
-
- val csvRecords = Seq(
- "First#Id#Score#Last",
- "Mike#1#12.3#Smith",
- "Bob#2#45.6#Taylor",
- "Sam#3#7.89#Miller",
- "Peter#4#0.12#Smith",
- "% Just a comment",
- "Liz#5#34.5#Williams",
- "Sally#6#6.78#Miller",
- "Alice#7#90.1#Smith",
- "Kelly#8#2.34#Williams"
- )
-
- val tempFile = File.createTempFile("csv-test", "tmp")
- tempFile.deleteOnExit()
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
- tmpWriter.write(csvRecords.mkString("$"))
- tmpWriter.close()
+ def testCsvTableSourceSQL(): Unit = {
+
+ val csvTable = CommonTestData.getCsvTableSource
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val csvTable = new CsvTableSource(
- tempFile.getAbsolutePath,
- Array("first", "id", "score", "last"),
- Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
-
tEnv.registerTableSource("csvTable", csvTable)
tEnv.sql(
"SELECT last, score, id FROM csvTable WHERE id < 4 ")
@@ -141,6 +108,35 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
"Miller,7.89,3")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testCsvTableSourceTableAPI(): Unit = {
+
+ val csvTable = CommonTestData.getCsvTableSource
+ StreamITCase.testResults = mutable.MutableList()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ tEnv.registerTableSource("csvTable", csvTable)
+ tEnv.ingest("csvTable")
+ .select('last, 'id.floor(), 'score * 2)
+ .toDataStream[Row]
+ .addSink(new StreamITCase.StringSink)
+
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Smith,1,24.6",
+ "Taylor,2,91.2",
+ "Miller,3,15.78",
+ "Smith,4,0.24",
+ "Williams,5,69.0",
+ "Miller,6,13.56",
+ "Smith,7,180.2",
+ "Williams,8,4.68")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef575e87/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
new file mode 100644
index 0000000..349b369
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.sources.CsvTableSource
+
+object CommonTestData {
+
+ def getCsvTableSource: CsvTableSource = {
+ val csvRecords = Seq(
+ "First#Id#Score#Last",
+ "Mike#1#12.3#Smith",
+ "Bob#2#45.6#Taylor",
+ "Sam#3#7.89#Miller",
+ "Peter#4#0.12#Smith",
+ "% Just a comment",
+ "Liz#5#34.5#Williams",
+ "Sally#6#6.78#Miller",
+ "Alice#7#90.1#Smith",
+ "Kelly#8#2.34#Williams"
+ )
+
+ val tempFile = File.createTempFile("csv-test", "tmp")
+ tempFile.deleteOnExit()
+ val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
+ tmpWriter.write(csvRecords.mkString("$"))
+ tmpWriter.close()
+
+ new CsvTableSource(
+ tempFile.getAbsolutePath,
+ Array("first", "id", "score", "last"),
+ Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ ),
+ fieldDelim = "#",
+ rowDelim = "$",
+ ignoreFirstLine = true,
+ ignoreComments = "%"
+ )
+ }
+}