You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:50 UTC
[flink] 01/03: [hotfix][table] Improve testing implementation for
the new projection push down
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 71e64989ee0b7109271ced5e1af03cc3694783e9
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:40:17 2020 +0800
[hotfix][table] Improve testing implementation for the new projection push down
---
.../apache/flink/table/utils/TableSchemaUtils.java | 20 ++
.../flink/table/utils/TableSchemaUtilsTest.java | 48 +++
.../TestProjectableValuesTableFactory.java | 326 ---------------------
.../planner/factories/TestValuesTableFactory.java | 79 ++++-
.../PushProjectIntoTableSourceScanRuleTest.java | 6 +-
.../org.apache.flink.table.factories.Factory | 1 -
.../planner/plan/stream/sql/TableScanTest.xml | 2 +-
.../planner/plan/batch/sql/TableSourceTest.scala | 4 +-
.../planner/plan/stream/sql/TableSourceTest.scala | 20 +-
.../plan/stream/table/TableSourceTest.scala | 20 +-
.../planner/runtime/batch/sql/CalcITCase.scala | 11 +-
.../planner/runtime/stream/sql/CalcITCase.scala | 10 +-
.../planner/runtime/utils/BatchTestBase.scala | 3 +-
.../planner/runtime/utils/StreamingTestBase.scala | 5 +-
14 files changed, 170 insertions(+), 385 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 4863cb2..67ee125 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
import java.util.List;
import java.util.Optional;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Utilities to {@link TableSchema}.
*/
@@ -62,6 +64,24 @@ public class TableSchemaUtils {
}
/**
+ * Creates a new {@link TableSchema} with the projected fields from another {@link TableSchema}.
+ * The new {@link TableSchema} doesn't contain any primary key or watermark information.
+ *
+ * @see org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown
+ */
+ public static TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
+ checkArgument(!containsGeneratedColumns(tableSchema), "It's illegal to project on a schema contains computed columns.");
+ TableSchema.Builder schemaBuilder = TableSchema.builder();
+ List<TableColumn> tableColumns = tableSchema.getTableColumns();
+ for (int[] fieldPath : projectedFields) {
+ checkArgument(fieldPath.length == 1, "Nested projection push down is not supported yet.");
+ TableColumn column = tableColumns.get(fieldPath[0]);
+ schemaBuilder.field(column.getName(), column.getType());
+ }
+ return schemaBuilder.build();
+ }
+
+ /**
* Returns true if there are any generated columns in the given {@link TableColumn}.
*/
public static boolean containsGeneratedColumns(TableSchema schema) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
index e96ddd9..3e760b1 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
@@ -70,4 +70,52 @@ public class TableSchemaUtilsTest {
exceptionRule.expectMessage("Constraint ct2 to drop does not exist");
TableSchemaUtils.dropConstraint(oriSchema, "ct2");
}
+
+ @Test
+ public void testInvalidProjectSchema() {
+ {
+ TableSchema schema = TableSchema.builder()
+ .field("a", DataTypes.INT().notNull())
+ .field("b", DataTypes.STRING())
+ .field("c", DataTypes.INT(), "a + 1")
+ .field("t", DataTypes.TIMESTAMP(3))
+ .primaryKey("ct1", new String[]{"a"})
+ .watermark("t", "t", DataTypes.TIMESTAMP(3))
+ .build();
+ exceptionRule.expect(IllegalArgumentException.class);
+ exceptionRule.expectMessage("It's illegal to project on a schema contains computed columns.");
+ int[][] projectedFields = {{1}};
+ TableSchemaUtils.projectSchema(schema, projectedFields);
+ }
+
+ {
+ TableSchema schema = TableSchema.builder()
+ .field("a", DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING())))
+ .field("b", DataTypes.STRING())
+ .build();
+ exceptionRule.expect(IllegalArgumentException.class);
+ exceptionRule.expectMessage("Nested projection push down is not supported yet.");
+ int[][] projectedFields = {{0, 1}};
+ TableSchemaUtils.projectSchema(schema, projectedFields);
+ }
+ }
+
+ @Test
+ public void testProjectSchema() {
+ TableSchema schema = TableSchema.builder()
+ .field("a", DataTypes.INT().notNull())
+ .field("b", DataTypes.STRING())
+ .field("t", DataTypes.TIMESTAMP(3))
+ .primaryKey("a")
+ .watermark("t", "t", DataTypes.TIMESTAMP(3))
+ .build();
+
+ int[][] projectedFields = {{2}, {0}};
+ TableSchema projected = TableSchemaUtils.projectSchema(schema, projectedFields);
+ TableSchema expected = TableSchema.builder()
+ .field("t", DataTypes.TIMESTAMP(3))
+ .field("a", DataTypes.INT().notNull())
+ .build();
+ assertEquals(expected, projected);
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
deleted file mode 100644
index c5367ea..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.factories;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.collection.Seq;
-
-/**
- * Test implementation of {@link DynamicTableSourceFactory} that supports projection push down.
- */
-public class TestProjectableValuesTableFactory implements DynamicTableSourceFactory {
-
- // --------------------------------------------------------------------------------------------
- // Data Registration
- // --------------------------------------------------------------------------------------------
-
- private static final AtomicInteger idCounter = new AtomicInteger(0);
- private static final Map<String, Collection<Tuple2<RowKind, Row>>> registeredData = new HashMap<>();
-
- /**
- * Register the given data into the data factory context and return the data id.
- * The data id can be used as a reference to the registered data in data connector DDL.
- */
- public static String registerData(Collection<Row> data) {
- List<Tuple2<RowKind, Row>> dataWithKinds = new ArrayList<>();
- for (Row row : data) {
- dataWithKinds.add(Tuple2.of(RowKind.INSERT, row));
- }
- return registerChangelogData(dataWithKinds);
- }
-
- /**
- * Register the given data into the data factory context and return the data id.
- * The data id can be used as a reference to the registered data in data connector DDL.
- */
- public static String registerData(Seq<Row> data) {
- return registerData(JavaScalaConversionUtil.toJava(data));
- }
-
- /**
- * Register the given data with RowKind into the data factory context and return the data id.
- * The data id can be used as a reference to the registered data in data connector DDL.
- * TODO: remove this utility once Row supports RowKind.
- */
- public static String registerChangelogData(Collection<Tuple2<RowKind, Row>> data) {
- String id = String.valueOf(idCounter.incrementAndGet());
- registeredData.put(id, data);
- return id;
- }
-
- /**
- * Removes the registered data under the given data id.
- */
- public static void clearAllRegisteredData() {
- registeredData.clear();
- }
-
- // --------------------------------------------------------------------------------------------
- // Factory
- // --------------------------------------------------------------------------------------------
-
- private static final String IDENTIFIER = "projectable-values";
-
- private static final ConfigOption<String> DATA_ID = ConfigOptions
- .key("data-id")
- .stringType()
- .defaultValue(null);
-
- private static final ConfigOption<Boolean> BOUNDED = ConfigOptions
- .key("bounded")
- .booleanType()
- .defaultValue(false);
-
- private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions
- .key("changelog-mode")
- .stringType()
- .defaultValue("I"); // all available "I,UA,UB,D"
-
- private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions
- .key("runtime-source")
- .stringType()
- .defaultValue("SourceFunction"); // another is "InputFormat"
-
- private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions
- .key("nested-projection-supported")
- .booleanType()
- .defaultValue(false);
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public DynamicTableSource createDynamicTableSource(Context context) {
- FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- helper.validate();
- ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
- String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
- boolean isBounded = helper.getOptions().get(BOUNDED);
- String dataId = helper.getOptions().get(DATA_ID);
- boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
-
- Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList());
- DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
- return new TestProjectableValuesTableSource(
- changelogMode,
- isBounded,
- runtimeSource,
- rowDataType,
- data,
- nestedProjectionSupported);
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- return new HashSet<>(Arrays.asList(
- DATA_ID,
- CHANGELOG_MODE,
- BOUNDED,
- RUNTIME_SOURCE,
- NESTED_PROJECTION_SUPPORTED));
- }
-
- private ChangelogMode parseChangelogMode(String string) {
- ChangelogMode.Builder builder = ChangelogMode.newBuilder();
- for (String split : string.split(",")) {
- switch (split.trim()) {
- case "I":
- builder.addContainedKind(RowKind.INSERT);
- break;
- case "UB":
- builder.addContainedKind(RowKind.UPDATE_BEFORE);
- break;
- case "UA":
- builder.addContainedKind(RowKind.UPDATE_AFTER);
- break;
- case "D":
- builder.addContainedKind(RowKind.DELETE);
- break;
- default:
- throw new IllegalArgumentException("Invalid ChangelogMode string: " + string);
- }
- }
- return builder.build();
- }
-
- // --------------------------------------------------------------------------------------------
- // Table source
- // --------------------------------------------------------------------------------------------
-
- /**
- * Values {@link DynamicTableSource} for testing.
- */
- private static class TestProjectableValuesTableSource implements ScanTableSource, SupportsProjectionPushDown {
-
- private final ChangelogMode changelogMode;
- private final boolean bounded;
- private final String runtimeSource;
- private DataType physicalRowDataType;
- private final Collection<Tuple2<RowKind, Row>> data;
- private final boolean nestedProjectionSupported;
- private int[] projectedFields = null;
-
- private TestProjectableValuesTableSource(
- ChangelogMode changelogMode,
- boolean bounded, String runtimeSource,
- DataType physicalRowDataType,
- Collection<Tuple2<RowKind, Row>> data,
- boolean nestedProjectionSupported) {
- this.changelogMode = changelogMode;
- this.bounded = bounded;
- this.runtimeSource = runtimeSource;
- this.physicalRowDataType = physicalRowDataType;
- this.data = data;
- this.nestedProjectionSupported = nestedProjectionSupported;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return changelogMode;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
- TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
- .createTypeInformation(physicalRowDataType)
- .createSerializer(new ExecutionConfig());
- DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
- Collection<RowData> values = convertToRowData(data, projectedFields, converter);
-
- if (runtimeSource.equals("SourceFunction")) {
- try {
- return SourceFunctionProvider.of(
- new FromElementsFunction<>(serializer, values),
- bounded);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else if (runtimeSource.equals("InputFormat")) {
- return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
- } else {
- throw new IllegalArgumentException("Unsupported runtime source class: " + runtimeSource);
- }
- }
-
- @Override
- public DynamicTableSource copy() {
- TestProjectableValuesTableSource newTableSource = new TestProjectableValuesTableSource(
- changelogMode, bounded, runtimeSource, physicalRowDataType, data, nestedProjectionSupported);
- newTableSource.projectedFields = projectedFields;
- return newTableSource;
- }
-
- @Override
- public String asSummaryString() {
- return "TestProjectableValues";
- }
-
- private static Collection<RowData> convertToRowData(
- Collection<Tuple2<RowKind, Row>> data,
- @Nullable int[] projectedFields,
- DataStructureConverter converter) {
- List<RowData> result = new ArrayList<>();
- for (Tuple2<RowKind, Row> value : data) {
- Row projectedRow;
- if (projectedFields == null) {
- projectedRow = value.f1;
- } else {
- Object[] newValues = new Object[projectedFields.length];
- for (int i = 0; i < projectedFields.length; ++i) {
- newValues[i] = value.f1.getField(projectedFields[i]);
- }
- projectedRow = Row.of(newValues);
- }
- RowData rowData = (RowData) converter.toInternal(projectedRow);
- if (rowData != null) {
- rowData.setRowKind(value.f0);
- result.add(rowData);
- }
- }
- return result;
- }
-
- @Override
- public boolean supportsNestedProjection() {
- return nestedProjectionSupported;
- }
-
- @Override
- public void applyProjection(int[][] projectedFields) {
- this.projectedFields = new int[projectedFields.length];
- FieldsDataType dataType = (FieldsDataType) physicalRowDataType;
- RowType rowType = ((RowType) physicalRowDataType.getLogicalType());
- DataTypes.Field[] fields = new DataTypes.Field[projectedFields.length];
- for (int i = 0; i < projectedFields.length; ++i) {
- int[] projection = projectedFields[i];
- Preconditions.checkArgument(projection.length == 1);
- int index = projection[0];
- this.projectedFields[i] = index;
- fields[i] = DataTypes.FIELD(rowType.getFieldNames().get(index), dataType.getChildren().get(index));
- }
- this.physicalRowDataType = DataTypes.ROW(fields);
- }
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index e889814..47dcf42 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -55,7 +56,6 @@ import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.Retra
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -235,6 +235,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
.booleanType()
.defaultValue(true);
+ private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions
+ .key("nested-projection-supported")
+ .booleanType()
+ .defaultValue(false);
+
@Override
public String factoryIdentifier() {
return IDENTIFIER;
@@ -251,18 +256,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
String sourceClass = helper.getOptions().get(TABLE_SOURCE_CLASS);
boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
+ boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
if (sourceClass.equals("DEFAULT")) {
Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList());
- DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new TestValuesTableSource(
+ physicalSchema,
changelogMode,
isBounded,
runtimeSource,
- rowDataType,
data,
isAsync,
- lookupFunctionClass);
+ lookupFunctionClass,
+ nestedProjectionSupported,
+ null);
} else {
try {
return InstantiationUtil.instantiate(
@@ -306,7 +314,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
ASYNC_ENABLED,
TABLE_SOURCE_CLASS,
SINK_INSERT_ONLY,
- RUNTIME_SINK));
+ RUNTIME_SINK,
+ NESTED_PROJECTION_SUPPORTED));
}
private ChangelogMode parseChangelogMode(String string) {
@@ -339,30 +348,37 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
/**
* Values {@link DynamicTableSource} for testing.
*/
- private static class TestValuesTableSource implements ScanTableSource, LookupTableSource {
+ private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+ private TableSchema physicalSchema;
private final ChangelogMode changelogMode;
private final boolean bounded;
private final String runtimeSource;
- private final DataType physicalRowDataType;
private final Collection<Tuple2<RowKind, Row>> data;
private final boolean isAsync;
private final @Nullable String lookupFunctionClass;
+ private final boolean nestedProjectionSupported;
+ private @Nullable int[] projectedFields;
private TestValuesTableSource(
+ TableSchema physicalSchema,
ChangelogMode changelogMode,
- boolean bounded, String runtimeSource,
- DataType physicalRowDataType,
+ boolean bounded,
+ String runtimeSource,
Collection<Tuple2<RowKind, Row>> data,
boolean isAsync,
- @Nullable String lookupFunctionClass) {
+ @Nullable String lookupFunctionClass,
+ boolean nestedProjectionSupported,
+ int[] projectedFields) {
+ this.physicalSchema = physicalSchema;
this.changelogMode = changelogMode;
this.bounded = bounded;
this.runtimeSource = runtimeSource;
- this.physicalRowDataType = physicalRowDataType;
this.data = data;
this.isAsync = isAsync;
this.lookupFunctionClass = lookupFunctionClass;
+ this.nestedProjectionSupported = nestedProjectionSupported;
+ this.projectedFields = projectedFields;
}
@Override
@@ -374,11 +390,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
- .createTypeInformation(physicalRowDataType)
+ .createTypeInformation(physicalSchema.toRowDataType())
.createSerializer(new ExecutionConfig());
- DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
+ DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType());
converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
- Collection<RowData> values = convertToRowData(data, converter);
+ Collection<RowData> values = convertToRowData(data, projectedFields, converter);
if (runtimeSource.equals("SourceFunction")) {
try {
@@ -438,8 +454,28 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
}
@Override
+ public boolean supportsNestedProjection() {
+ return nestedProjectionSupported;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields) {
+ this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+ this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
+ }
+
+ @Override
public DynamicTableSource copy() {
- return new TestValuesTableSource(changelogMode, bounded, runtimeSource, physicalRowDataType, data, isAsync, lookupFunctionClass);
+ return new TestValuesTableSource(
+ physicalSchema,
+ changelogMode,
+ bounded,
+ runtimeSource,
+ data,
+ isAsync,
+ lookupFunctionClass,
+ nestedProjectionSupported,
+ projectedFields);
}
@Override
@@ -449,10 +485,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
private static Collection<RowData> convertToRowData(
Collection<Tuple2<RowKind, Row>> data,
+ int[] projectedFields,
DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
for (Tuple2<RowKind, Row> value : data) {
- RowData rowData = (RowData) converter.toInternal(value.f1);
+ Row projectedRow;
+ if (projectedFields == null) {
+ projectedRow = value.f1;
+ } else {
+ Object[] newValues = new Object[projectedFields.length];
+ for (int i = 0; i < projectedFields.length; ++i) {
+ newValues[i] = value.f1.getField(projectedFields[i]);
+ }
+ projectedRow = Row.of(newValues);
+ }
+ RowData rowData = (RowData) converter.toInternal(projectedRow);
if (rowData != null) {
rowData.setRowKind(value.f0);
result.add(rowData);
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index 955e3d4..b8a5b0b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -54,7 +54,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
" b bigint,\n" +
" c string\n" +
") WITH (\n" +
- " 'connector' = 'projectable-values',\n" +
+ " 'connector' = 'values',\n" +
" 'bounded' = 'true'\n" +
")";
util().tableEnv().executeSql(ddl1);
@@ -66,7 +66,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
" c string,\n" +
" d as a + 1\n" +
") WITH (\n" +
- " 'connector' = 'projectable-values',\n" +
+ " 'connector' = 'values',\n" +
" 'bounded' = 'true'\n" +
")";
util().tableEnv().executeSql(ddl2);
@@ -92,7 +92,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
" nested row<name string, `value` int>,\n" +
" name string\n" +
") WITH (\n" +
- " 'connector' = 'projectable-values',\n" +
+ " 'connector' = 'values',\n" +
" 'nested-projection-supported' = '" + nestedProjectionSupported + "',\n" +
" 'bounded' = 'true'\n" +
")";
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7632f4b..498fb98 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,5 +14,4 @@
# limitations under the License.
org.apache.flink.table.planner.factories.TestValuesTableFactory
-org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 9c08f74..5e8c3fe 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -55,7 +55,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
+- Exchange(distribution=[single], changelogMode=[I,UB,UA])
+- Calc(select=[0 AS $f0], where=[>(a, 1)], changelogMode=[I,UB,UA])
- +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[ts, a, b], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a]]], fields=[a], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index 7b12fcf..9699a05 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase {
| b bigint,
| c varchar(32)
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'true'
|)
""".stripMargin
@@ -49,7 +49,7 @@ class TableSourceTest extends TableTestBase {
| nested row<name string, `value` int>,
| name string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'true'
|)
|""".stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 5f15680..b5f252b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -37,7 +37,7 @@ class TableSourceTest extends TableTestBase {
| name varchar(32),
| watermark for rowtime as rowtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -57,7 +57,7 @@ class TableSourceTest extends TableTestBase {
| name varchar(32),
| watermark for rowtime as rowtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
| pTime as PROCTIME(),
| watermark for pTime as pTime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for ptime as ptime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -128,7 +128,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for rtime as rtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -148,7 +148,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for rtime as rtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -168,7 +168,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for ptime as ptime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -188,7 +188,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for rtime as rtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -208,7 +208,7 @@ class TableSourceTest extends TableTestBase {
| nested row<name string, `value` int>,
| name string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'nested-projection-supported' = 'false',
| 'bounded' = 'false'
|)
@@ -235,7 +235,7 @@ class TableSourceTest extends TableTestBase {
| id int,
| name varchar(32)
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index 9d3ce05..0f0ead9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -39,7 +39,7 @@ class TableSourceTest extends TableTestBase {
| name varchar(32),
| watermark for rowtime as rowtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -61,7 +61,7 @@ class TableSourceTest extends TableTestBase {
| name varchar(32),
| watermark for rowtime as rowtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
| proctime as PROCTIME(),
| watermark for proctime as proctime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
| name varchar(32),
| proctime as PROCTIME()
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -132,7 +132,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for ptime as ptime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -154,7 +154,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for rtime as rtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -175,7 +175,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for rtime as rtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -196,7 +196,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for ptime as ptime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -217,7 +217,7 @@ class TableSourceTest extends TableTestBase {
| ptime as PROCTIME(),
| watermark for rtime as rtime
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'bounded' = 'false'
|)
""".stripMargin
@@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase {
| nested row<name string, `value` int>,
| name string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'nested-projection-supported' = 'false',
| 'bounded' = 'false'
|)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index dda3bbe..2d799d1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
-import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule
import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil.parseFieldNames
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -42,7 +42,6 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
import org.apache.flink.table.planner.utils.DateTimeTestUtil._
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
import org.apache.flink.types.Row
-
import org.junit.Assert.assertEquals
import org.junit._
@@ -1250,7 +1249,7 @@ class CalcITCase extends BatchTestBase {
@Test
def testSimpleProject(): Unit = {
- val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+ val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3)
val ddl =
s"""
|CREATE TABLE SimpleTable (
@@ -1258,7 +1257,7 @@ class CalcITCase extends BatchTestBase {
| b bigint,
| c string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'data-id' = '$myTableDataId',
| 'bounded' = 'true'
|)
@@ -1278,7 +1277,7 @@ class CalcITCase extends BatchTestBase {
row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"),
row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji")
)
- val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+ val myTableDataId = TestValuesTableFactory.registerData(data)
val ddl =
s"""
|CREATE TABLE NestedTable (
@@ -1288,7 +1287,7 @@ class CalcITCase extends BatchTestBase {
| nested row<name string, `value` int>,
| name string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'nested-projection-supported' = 'false',
| 'data-id' = '$myTableDataId',
| 'bounded' = 'true'
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 7cc6e66..d32c9ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.table.api.internal.TableEnvironmentInternal
import org.apache.flink.table.api.scala._
import org.apache.flink.table.data.{GenericRowData, RowData}
-import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo
@@ -290,7 +290,7 @@ class CalcITCase extends StreamingTestBase {
@Test
def testSimpleProject(): Unit = {
- val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+ val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3)
val ddl =
s"""
|CREATE TABLE SimpleTable (
@@ -298,7 +298,7 @@ class CalcITCase extends StreamingTestBase {
| b bigint,
| c string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'data-id' = '$myTableDataId',
| 'bounded' = 'true'
|)
@@ -321,7 +321,7 @@ class CalcITCase extends StreamingTestBase {
row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"),
row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji")
)
- val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+ val myTableDataId = TestValuesTableFactory.registerData(data)
val ddl =
s"""
|CREATE TABLE NestedTable (
@@ -331,7 +331,7 @@ class CalcITCase extends StreamingTestBase {
| nested row<name string, `value` int>,
| name string
|) WITH (
- | 'connector' = 'projectable-values',
+ | 'connector' = 'values',
| 'nested-projection-supported' = 'false',
| 'data-id' = '$myTableDataId',
| 'bounded' = 'true'
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 5b59ffb..403a240 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.writer.BinaryRowWriter
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
@@ -81,7 +81,6 @@ class BatchTestBase extends BatchAbstractTestBase {
@After
def after(): Unit = {
TestValuesTableFactory.clearAllData()
- TestProjectableValuesTableFactory.clearAllRegisteredData()
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 3b6bbe4..3fffbc0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
-import org.apache.flink.table.api.{EnvironmentSettings, ImplicitExpressionConversions}
-import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory}
+import org.apache.flink.table.api.ImplicitExpressionConversions
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
@@ -62,7 +62,6 @@ class StreamingTestBase extends AbstractTestBase {
def after(): Unit = {
StreamTestSink.clear()
TestValuesTableFactory.clearAllData()
- TestProjectableValuesTableFactory.clearAllRegisteredData()
}
/**