You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:50 UTC

[flink] 01/03: [hotfix][table] Improve testing implementation for the new projection push down

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71e64989ee0b7109271ced5e1af03cc3694783e9
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:40:17 2020 +0800

    [hotfix][table] Improve testing implementation for the new projection push down
---
 .../apache/flink/table/utils/TableSchemaUtils.java |  20 ++
 .../flink/table/utils/TableSchemaUtilsTest.java    |  48 +++
 .../TestProjectableValuesTableFactory.java         | 326 ---------------------
 .../planner/factories/TestValuesTableFactory.java  |  79 ++++-
 .../PushProjectIntoTableSourceScanRuleTest.java    |   6 +-
 .../org.apache.flink.table.factories.Factory       |   1 -
 .../planner/plan/stream/sql/TableScanTest.xml      |   2 +-
 .../planner/plan/batch/sql/TableSourceTest.scala   |   4 +-
 .../planner/plan/stream/sql/TableSourceTest.scala  |  20 +-
 .../plan/stream/table/TableSourceTest.scala        |  20 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  11 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  10 +-
 .../planner/runtime/utils/BatchTestBase.scala      |   3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |   5 +-
 14 files changed, 170 insertions(+), 385 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 4863cb2..67ee125 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Utilities to {@link TableSchema}.
  */
@@ -62,6 +64,24 @@ public class TableSchemaUtils {
 	}
 
 	/**
+	 * Creates a new {@link TableSchema} with the projected fields from another {@link TableSchema}.
+	 * The new {@link TableSchema} doesn't contain any primary key or watermark information.
+	 *
+	 * @see org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown
+	 */
+	public static TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
+		checkArgument(!containsGeneratedColumns(tableSchema), "It's illegal to project on a schema contains computed columns.");
+		TableSchema.Builder schemaBuilder = TableSchema.builder();
+		List<TableColumn> tableColumns = tableSchema.getTableColumns();
+		for (int[] fieldPath : projectedFields) {
+			checkArgument(fieldPath.length == 1, "Nested projection push down is not supported yet.");
+			TableColumn column = tableColumns.get(fieldPath[0]);
+			schemaBuilder.field(column.getName(), column.getType());
+		}
+		return schemaBuilder.build();
+	}
+
+	/**
 	 * Returns true if there are any generated columns in the given {@link TableColumn}.
 	 */
 	public static boolean containsGeneratedColumns(TableSchema schema) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
index e96ddd9..3e760b1 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
@@ -70,4 +70,52 @@ public class TableSchemaUtilsTest {
 		exceptionRule.expectMessage("Constraint ct2 to drop does not exist");
 		TableSchemaUtils.dropConstraint(oriSchema, "ct2");
 	}
+
+	@Test
+	public void testInvalidProjectSchema() {
+		{
+			TableSchema schema = TableSchema.builder()
+				.field("a", DataTypes.INT().notNull())
+				.field("b", DataTypes.STRING())
+				.field("c", DataTypes.INT(), "a + 1")
+				.field("t", DataTypes.TIMESTAMP(3))
+				.primaryKey("ct1", new String[]{"a"})
+				.watermark("t", "t", DataTypes.TIMESTAMP(3))
+				.build();
+			exceptionRule.expect(IllegalArgumentException.class);
+			exceptionRule.expectMessage("It's illegal to project on a schema contains computed columns.");
+			int[][] projectedFields = {{1}};
+			TableSchemaUtils.projectSchema(schema, projectedFields);
+		}
+
+		{
+			TableSchema schema = TableSchema.builder()
+				.field("a", DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING())))
+				.field("b", DataTypes.STRING())
+				.build();
+			exceptionRule.expect(IllegalArgumentException.class);
+			exceptionRule.expectMessage("Nested projection push down is not supported yet.");
+			int[][] projectedFields = {{0, 1}};
+			TableSchemaUtils.projectSchema(schema, projectedFields);
+		}
+	}
+
+	@Test
+	public void testProjectSchema() {
+		TableSchema schema = TableSchema.builder()
+			.field("a", DataTypes.INT().notNull())
+			.field("b", DataTypes.STRING())
+			.field("t", DataTypes.TIMESTAMP(3))
+			.primaryKey("a")
+			.watermark("t", "t", DataTypes.TIMESTAMP(3))
+			.build();
+
+		int[][] projectedFields = {{2}, {0}};
+		TableSchema projected = TableSchemaUtils.projectSchema(schema, projectedFields);
+		TableSchema expected = TableSchema.builder()
+			.field("t", DataTypes.TIMESTAMP(3))
+			.field("a", DataTypes.INT().notNull())
+			.build();
+		assertEquals(expected, projected);
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
deleted file mode 100644
index c5367ea..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.factories;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.collection.Seq;
-
-/**
- * Test implementation of {@link DynamicTableSourceFactory} that supports projection push down.
- */
-public class TestProjectableValuesTableFactory implements DynamicTableSourceFactory {
-
-	// --------------------------------------------------------------------------------------------
-	// Data Registration
-	// --------------------------------------------------------------------------------------------
-
-	private static final AtomicInteger idCounter = new AtomicInteger(0);
-	private static final Map<String, Collection<Tuple2<RowKind, Row>>> registeredData = new HashMap<>();
-
-	/**
-	 * Register the given data into the data factory context and return the data id.
-	 * The data id can be used as a reference to the registered data in data connector DDL.
-	 */
-	public static String registerData(Collection<Row> data) {
-		List<Tuple2<RowKind, Row>> dataWithKinds = new ArrayList<>();
-		for (Row row : data) {
-			dataWithKinds.add(Tuple2.of(RowKind.INSERT, row));
-		}
-		return registerChangelogData(dataWithKinds);
-	}
-
-	/**
-	 * Register the given data into the data factory context and return the data id.
-	 * The data id can be used as a reference to the registered data in data connector DDL.
-	 */
-	public static String registerData(Seq<Row> data) {
-		return registerData(JavaScalaConversionUtil.toJava(data));
-	}
-
-	/**
-	 * Register the given data with RowKind into the data factory context and return the data id.
-	 * The data id can be used as a reference to the registered data in data connector DDL.
-	 * TODO: remove this utility once Row supports RowKind.
-	 */
-	public static String registerChangelogData(Collection<Tuple2<RowKind, Row>> data) {
-		String id = String.valueOf(idCounter.incrementAndGet());
-		registeredData.put(id, data);
-		return id;
-	}
-
-	/**
-	 * Removes the registered data under the given data id.
-	 */
-	public static void clearAllRegisteredData() {
-		registeredData.clear();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Factory
-	// --------------------------------------------------------------------------------------------
-
-	private static final String IDENTIFIER = "projectable-values";
-
-	private static final ConfigOption<String> DATA_ID = ConfigOptions
-			.key("data-id")
-			.stringType()
-			.defaultValue(null);
-
-	private static final ConfigOption<Boolean> BOUNDED = ConfigOptions
-			.key("bounded")
-			.booleanType()
-			.defaultValue(false);
-
-	private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions
-			.key("changelog-mode")
-			.stringType()
-			.defaultValue("I"); // all available "I,UA,UB,D"
-
-	private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions
-			.key("runtime-source")
-			.stringType()
-			.defaultValue("SourceFunction"); // another is "InputFormat"
-
-	private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions
-			.key("nested-projection-supported")
-			.booleanType()
-			.defaultValue(false);
-
-	@Override
-	public String factoryIdentifier() {
-		return IDENTIFIER;
-	}
-
-	@Override
-	public DynamicTableSource createDynamicTableSource(Context context) {
-		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-		helper.validate();
-		ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
-		String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
-		boolean isBounded = helper.getOptions().get(BOUNDED);
-		String dataId = helper.getOptions().get(DATA_ID);
-		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
-
-		Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList());
-		DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		return new TestProjectableValuesTableSource(
-				changelogMode,
-				isBounded,
-				runtimeSource,
-				rowDataType,
-				data,
-				nestedProjectionSupported);
-	}
-
-	@Override
-	public Set<ConfigOption<?>> requiredOptions() {
-		return Collections.emptySet();
-	}
-
-	@Override
-	public Set<ConfigOption<?>> optionalOptions() {
-		return new HashSet<>(Arrays.asList(
-				DATA_ID,
-				CHANGELOG_MODE,
-				BOUNDED,
-				RUNTIME_SOURCE,
-				NESTED_PROJECTION_SUPPORTED));
-	}
-
-	private ChangelogMode parseChangelogMode(String string) {
-		ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-		for (String split : string.split(",")) {
-			switch (split.trim()) {
-				case "I":
-					builder.addContainedKind(RowKind.INSERT);
-					break;
-				case "UB":
-					builder.addContainedKind(RowKind.UPDATE_BEFORE);
-					break;
-				case "UA":
-					builder.addContainedKind(RowKind.UPDATE_AFTER);
-					break;
-				case "D":
-					builder.addContainedKind(RowKind.DELETE);
-					break;
-				default:
-					throw new IllegalArgumentException("Invalid ChangelogMode string: " + string);
-			}
-		}
-		return builder.build();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Table source
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Values {@link DynamicTableSource} for testing.
-	 */
-	private static class TestProjectableValuesTableSource implements ScanTableSource, SupportsProjectionPushDown {
-
-		private final ChangelogMode changelogMode;
-		private final boolean bounded;
-		private final String runtimeSource;
-		private DataType physicalRowDataType;
-		private final Collection<Tuple2<RowKind, Row>> data;
-		private final boolean nestedProjectionSupported;
-		private int[] projectedFields = null;
-
-		private TestProjectableValuesTableSource(
-				ChangelogMode changelogMode,
-				boolean bounded, String runtimeSource,
-				DataType physicalRowDataType,
-				Collection<Tuple2<RowKind, Row>> data,
-				boolean nestedProjectionSupported) {
-			this.changelogMode = changelogMode;
-			this.bounded = bounded;
-			this.runtimeSource = runtimeSource;
-			this.physicalRowDataType = physicalRowDataType;
-			this.data = data;
-			this.nestedProjectionSupported = nestedProjectionSupported;
-		}
-
-		@Override
-		public ChangelogMode getChangelogMode() {
-			return changelogMode;
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
-			TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
-					.createTypeInformation(physicalRowDataType)
-					.createSerializer(new ExecutionConfig());
-			DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
-			Collection<RowData> values = convertToRowData(data, projectedFields, converter);
-
-			if (runtimeSource.equals("SourceFunction")) {
-				try {
-					return SourceFunctionProvider.of(
-							new FromElementsFunction<>(serializer, values),
-							bounded);
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
-			} else if (runtimeSource.equals("InputFormat")) {
-				return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
-			} else {
-				throw new IllegalArgumentException("Unsupported runtime source class: " + runtimeSource);
-			}
-		}
-
-		@Override
-		public DynamicTableSource copy() {
-			TestProjectableValuesTableSource newTableSource = new TestProjectableValuesTableSource(
-					changelogMode, bounded, runtimeSource, physicalRowDataType, data, nestedProjectionSupported);
-			newTableSource.projectedFields = projectedFields;
-			return newTableSource;
-		}
-
-		@Override
-		public String asSummaryString() {
-			return "TestProjectableValues";
-		}
-
-		private static Collection<RowData> convertToRowData(
-				Collection<Tuple2<RowKind, Row>> data,
-				@Nullable int[] projectedFields,
-				DataStructureConverter converter) {
-			List<RowData> result = new ArrayList<>();
-			for (Tuple2<RowKind, Row> value : data) {
-				Row projectedRow;
-				if (projectedFields == null) {
-					projectedRow = value.f1;
-				} else {
-					Object[] newValues = new Object[projectedFields.length];
-					for (int i = 0; i < projectedFields.length; ++i) {
-						newValues[i] = value.f1.getField(projectedFields[i]);
-					}
-					projectedRow = Row.of(newValues);
-				}
-				RowData rowData = (RowData) converter.toInternal(projectedRow);
-				if (rowData != null) {
-					rowData.setRowKind(value.f0);
-					result.add(rowData);
-				}
-			}
-			return result;
-		}
-
-		@Override
-		public boolean supportsNestedProjection() {
-			return nestedProjectionSupported;
-		}
-
-		@Override
-		public void applyProjection(int[][] projectedFields) {
-			this.projectedFields = new int[projectedFields.length];
-			FieldsDataType dataType = (FieldsDataType) physicalRowDataType;
-			RowType rowType = ((RowType) physicalRowDataType.getLogicalType());
-			DataTypes.Field[] fields = new DataTypes.Field[projectedFields.length];
-			for (int i = 0; i < projectedFields.length; ++i) {
-				int[] projection = projectedFields[i];
-				Preconditions.checkArgument(projection.length == 1);
-				int index = projection[0];
-				this.projectedFields[i] = index;
-				fields[i] = DataTypes.FIELD(rowType.getFieldNames().get(index), dataType.getChildren().get(index));
-			}
-			this.physicalRowDataType = DataTypes.ROW(fields);
-		}
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index e889814..47dcf42 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -55,7 +56,6 @@ import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.Retra
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -235,6 +235,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		.booleanType()
 		.defaultValue(true);
 
+	private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions
+		.key("nested-projection-supported")
+		.booleanType()
+		.defaultValue(false);
+
 	@Override
 	public String factoryIdentifier() {
 		return IDENTIFIER;
@@ -251,18 +256,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		String sourceClass = helper.getOptions().get(TABLE_SOURCE_CLASS);
 		boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
 		String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
+		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
 
 		if (sourceClass.equals("DEFAULT")) {
 			Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList());
-			DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+			TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 			return new TestValuesTableSource(
+				physicalSchema,
 				changelogMode,
 				isBounded,
 				runtimeSource,
-				rowDataType,
 				data,
 				isAsync,
-				lookupFunctionClass);
+				lookupFunctionClass,
+				nestedProjectionSupported,
+				null);
 		} else {
 			try {
 				return InstantiationUtil.instantiate(
@@ -306,7 +314,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 			ASYNC_ENABLED,
 			TABLE_SOURCE_CLASS,
 			SINK_INSERT_ONLY,
-			RUNTIME_SINK));
+			RUNTIME_SINK,
+			NESTED_PROJECTION_SUPPORTED));
 	}
 
 	private ChangelogMode parseChangelogMode(String string) {
@@ -339,30 +348,37 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
 
+		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
-		private final DataType physicalRowDataType;
 		private final Collection<Tuple2<RowKind, Row>> data;
 		private final boolean isAsync;
 		private final @Nullable String lookupFunctionClass;
+		private final boolean nestedProjectionSupported;
+		private @Nullable int[] projectedFields;
 
 		private TestValuesTableSource(
+				TableSchema physicalSchema,
 				ChangelogMode changelogMode,
-				boolean bounded, String runtimeSource,
-				DataType physicalRowDataType,
+				boolean bounded,
+				String runtimeSource,
 				Collection<Tuple2<RowKind, Row>> data,
 				boolean isAsync,
-				@Nullable String lookupFunctionClass) {
+				@Nullable String lookupFunctionClass,
+				boolean nestedProjectionSupported,
+				int[] projectedFields) {
+			this.physicalSchema = physicalSchema;
 			this.changelogMode = changelogMode;
 			this.bounded = bounded;
 			this.runtimeSource = runtimeSource;
-			this.physicalRowDataType = physicalRowDataType;
 			this.data = data;
 			this.isAsync = isAsync;
 			this.lookupFunctionClass = lookupFunctionClass;
+			this.nestedProjectionSupported = nestedProjectionSupported;
+			this.projectedFields = projectedFields;
 		}
 
 		@Override
@@ -374,11 +390,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		@Override
 		public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
 			TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
-				.createTypeInformation(physicalRowDataType)
+				.createTypeInformation(physicalSchema.toRowDataType())
 				.createSerializer(new ExecutionConfig());
-			DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
+			DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType());
 			converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
-			Collection<RowData> values = convertToRowData(data, converter);
+			Collection<RowData> values = convertToRowData(data, projectedFields, converter);
 
 			if (runtimeSource.equals("SourceFunction")) {
 				try {
@@ -438,8 +454,28 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		}
 
 		@Override
+		public boolean supportsNestedProjection() {
+			return nestedProjectionSupported;
+		}
+
+		@Override
+		public void applyProjection(int[][] projectedFields) {
+			this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
+		}
+
+		@Override
 		public DynamicTableSource copy() {
-			return new TestValuesTableSource(changelogMode, bounded, runtimeSource, physicalRowDataType, data, isAsync, lookupFunctionClass);
+			return new TestValuesTableSource(
+				physicalSchema,
+				changelogMode,
+				bounded,
+				runtimeSource,
+				data,
+				isAsync,
+				lookupFunctionClass,
+				nestedProjectionSupported,
+				projectedFields);
 		}
 
 		@Override
@@ -449,10 +485,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 
 		private static Collection<RowData> convertToRowData(
 				Collection<Tuple2<RowKind, Row>> data,
+				int[] projectedFields,
 				DataStructureConverter converter) {
 			List<RowData> result = new ArrayList<>();
 			for (Tuple2<RowKind, Row> value : data) {
-				RowData rowData = (RowData) converter.toInternal(value.f1);
+				Row projectedRow;
+				if (projectedFields == null) {
+					projectedRow = value.f1;
+				} else {
+					Object[] newValues = new Object[projectedFields.length];
+					for (int i = 0; i < projectedFields.length; ++i) {
+						newValues[i] = value.f1.getField(projectedFields[i]);
+					}
+					projectedRow = Row.of(newValues);
+				}
+				RowData rowData = (RowData) converter.toInternal(projectedRow);
 				if (rowData != null) {
 					rowData.setRowKind(value.f0);
 					result.add(rowData);
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index 955e3d4..b8a5b0b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -54,7 +54,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
 						"  b bigint,\n" +
 						"  c string\n" +
 						") WITH (\n" +
-						" 'connector' = 'projectable-values',\n" +
+						" 'connector' = 'values',\n" +
 						" 'bounded' = 'true'\n" +
 						")";
 		util().tableEnv().executeSql(ddl1);
@@ -66,7 +66,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
 						"  c string,\n" +
 						"  d as a + 1\n" +
 						") WITH (\n" +
-						" 'connector' = 'projectable-values',\n" +
+						" 'connector' = 'values',\n" +
 						" 'bounded' = 'true'\n" +
 						")";
 		util().tableEnv().executeSql(ddl2);
@@ -92,7 +92,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
 						"  nested row<name string, `value` int>,\n" +
 						"  name string\n" +
 						") WITH (\n" +
-						" 'connector' = 'projectable-values',\n" +
+						" 'connector' = 'values',\n" +
 						" 'nested-projection-supported' = '" + nestedProjectionSupported + "',\n" +
 						"  'bounded' = 'true'\n" +
 						")";
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7632f4b..498fb98 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,5 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.table.planner.factories.TestValuesTableFactory
-org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
 org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 9c08f74..5e8c3fe 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -55,7 +55,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
 +- Exchange(distribution=[single], changelogMode=[I,UB,UA])
    +- Calc(select=[0 AS $f0], where=[>(a, 1)], changelogMode=[I,UB,UA])
-      +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[ts, a, b], changelogMode=[I,UB,UA])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a]]], fields=[a], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index 7b12fcf..9699a05 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase {
          |  b bigint,
          |  c varchar(32)
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'true'
          |)
        """.stripMargin
@@ -49,7 +49,7 @@ class TableSourceTest extends TableTestBase {
         |  nested row<name string, `value` int>,
         |  name string
         |) WITH (
-        | 'connector' = 'projectable-values',
+        | 'connector' = 'values',
         |  'bounded' = 'true'
         |)
         |""".stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 5f15680..b5f252b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -37,7 +37,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -57,7 +57,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
          |  pTime as PROCTIME(),
          |  watermark for pTime as pTime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -128,7 +128,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -148,7 +148,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -168,7 +168,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -188,7 +188,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -208,7 +208,7 @@ class TableSourceTest extends TableTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'bounded' = 'false'
          |)
@@ -235,7 +235,7 @@ class TableSourceTest extends TableTestBase {
          |  id int,
          |  name varchar(32)
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index 9d3ce05..0f0ead9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -39,7 +39,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -61,7 +61,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
          |  proctime as PROCTIME(),
          |  watermark for proctime as proctime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  proctime as PROCTIME()
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -132,7 +132,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -154,7 +154,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -175,7 +175,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -196,7 +196,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -217,7 +217,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'bounded' = 'false'
          |)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index dda3bbe..2d799d1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
 import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
 import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
-import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule
 import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil.parseFieldNames
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -42,7 +42,6 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
 import org.junit._
 
@@ -1250,7 +1249,7 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testSimpleProject(): Unit = {
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+    val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3)
     val ddl =
       s"""
          |CREATE TABLE SimpleTable (
@@ -1258,7 +1257,7 @@ class CalcITCase extends BatchTestBase {
          |  b bigint,
          |  c string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
          |)
@@ -1278,7 +1277,7 @@ class CalcITCase extends BatchTestBase {
       row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"),
       row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji")
     )
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+    val myTableDataId = TestValuesTableFactory.registerData(data)
     val ddl =
       s"""
          |CREATE TABLE NestedTable (
@@ -1288,7 +1287,7 @@ class CalcITCase extends BatchTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 7cc6e66..d32c9ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.data.{GenericRowData, RowData}
-import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo
@@ -290,7 +290,7 @@ class CalcITCase extends StreamingTestBase {
 
   @Test
   def testSimpleProject(): Unit = {
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+    val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3)
     val ddl =
       s"""
          |CREATE TABLE SimpleTable (
@@ -298,7 +298,7 @@ class CalcITCase extends StreamingTestBase {
          |  b bigint,
          |  c string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
          |)
@@ -321,7 +321,7 @@ class CalcITCase extends StreamingTestBase {
       row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"),
       row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji")
     )
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+    val myTableDataId = TestValuesTableFactory.registerData(data)
     val ddl =
       s"""
          |CREATE TABLE NestedTable (
@@ -331,7 +331,7 @@ class CalcITCase extends StreamingTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 5b59ffb..403a240 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.writer.BinaryRowWriter
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
@@ -81,7 +81,6 @@ class BatchTestBase extends BatchAbstractTestBase {
   @After
   def after(): Unit = {
     TestValuesTableFactory.clearAllData()
-    TestProjectableValuesTableFactory.clearAllRegisteredData()
   }
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 3b6bbe4..3fffbc0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.StreamTableEnvironment
-import org.apache.flink.table.api.{EnvironmentSettings, ImplicitExpressionConversions}
-import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory}
+import org.apache.flink.table.api.ImplicitExpressionConversions
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.api.{EnvironmentSettings, Table}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
@@ -62,7 +62,6 @@ class StreamingTestBase extends AbstractTestBase {
   def after(): Unit = {
     StreamTestSink.clear()
     TestValuesTableFactory.clearAllData()
-    TestProjectableValuesTableFactory.clearAllRegisteredData()
   }
 
   /**