You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:49 UTC

[flink] branch master updated (51a0d42 -> 73520ca)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 51a0d42  [FLINK-12030][connector/kafka] Check the topic existence after topic creation using KafkaConsumer.
     new 71e6498  [hotfix][table] Improve testing implementation for the new projection push down
     new 57e4748  [FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source
     new 73520ca  [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connectors/flink-connector-hbase/pom.xml     |   8 +
 .../connector/hbase/HBaseDynamicTableFactory.java  |  43 ++-
 .../hbase/source/HBaseDynamicTableSource.java      |  21 +-
 .../flink/connector/hbase/util/HBaseSerde.java     |  24 +-
 .../connector/hbase/util/HBaseTableSchema.java     |  12 -
 .../flink/connector/hbase/HBaseTablePlanTest.java  | 127 ++++++++
 .../flink/connector/hbase/HBaseTablePlanTest.xml   |  14 +-
 flink-connectors/flink-connector-jdbc/pom.xml      |   2 +-
 .../jdbc/table/JdbcDynamicTableSource.java         |  48 +--
 .../table/JdbcDynamicTableSourceSinkFactory.java   |   7 +-
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   |  45 +--
 ...ctionITCase.java => JdbcLookupTableITCase.java} |  43 +--
 .../connector/jdbc/table/JdbcTablePlanTest.java    |  54 ++++
 .../jdbc/table/JdbcTableSourceITCase.java          |   1 -
 .../connector/jdbc/table/JdbcTablePlanTest.xml     |  13 +-
 .../apache/flink/table/utils/TableSchemaUtils.java |  20 ++
 .../flink/table/utils/TableSchemaUtilsTest.java    |  48 +++
 .../TestProjectableValuesTableFactory.java         | 326 ---------------------
 .../planner/factories/TestValuesTableFactory.java  |  79 ++++-
 .../PushProjectIntoTableSourceScanRuleTest.java    |   6 +-
 .../org.apache.flink.table.factories.Factory       |   1 -
 .../planner/plan/stream/sql/TableScanTest.xml      |   2 +-
 .../planner/plan/batch/sql/TableSourceTest.scala   |   4 +-
 .../planner/plan/stream/sql/TableSourceTest.scala  |  20 +-
 .../plan/stream/table/TableSourceTest.scala        |  20 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  11 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  10 +-
 .../planner/runtime/utils/BatchTestBase.scala      |   3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |   5 +-
 29 files changed, 524 insertions(+), 493 deletions(-)
 create mode 100644 flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
 copy flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/ColumnFunctionsTest.xml => flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml (67%)
 rename flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/{JdbcLookupFunctionITCase.java => JdbcLookupTableITCase.java} (86%)
 create mode 100644 flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
 copy flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/ColumnFunctionsTest.xml => flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml (67%)
 delete mode 100644 flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java


[flink] 01/03: [hotfix][table] Improve testing implementation for the new projection push down

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71e64989ee0b7109271ced5e1af03cc3694783e9
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:40:17 2020 +0800

    [hotfix][table] Improve testing implementation for the new projection push down
---
 .../apache/flink/table/utils/TableSchemaUtils.java |  20 ++
 .../flink/table/utils/TableSchemaUtilsTest.java    |  48 +++
 .../TestProjectableValuesTableFactory.java         | 326 ---------------------
 .../planner/factories/TestValuesTableFactory.java  |  79 ++++-
 .../PushProjectIntoTableSourceScanRuleTest.java    |   6 +-
 .../org.apache.flink.table.factories.Factory       |   1 -
 .../planner/plan/stream/sql/TableScanTest.xml      |   2 +-
 .../planner/plan/batch/sql/TableSourceTest.scala   |   4 +-
 .../planner/plan/stream/sql/TableSourceTest.scala  |  20 +-
 .../plan/stream/table/TableSourceTest.scala        |  20 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  11 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  10 +-
 .../planner/runtime/utils/BatchTestBase.scala      |   3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |   5 +-
 14 files changed, 170 insertions(+), 385 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 4863cb2..67ee125 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Utilities to {@link TableSchema}.
  */
@@ -62,6 +64,24 @@ public class TableSchemaUtils {
 	}
 
 	/**
+	 * Creates a new {@link TableSchema} with the projected fields from another {@link TableSchema}.
+	 * The new {@link TableSchema} doesn't contain any primary key or watermark information.
+	 *
+	 * @see org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown
+	 */
+	public static TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
+		checkArgument(!containsGeneratedColumns(tableSchema), "It's illegal to project on a schema contains computed columns.");
+		TableSchema.Builder schemaBuilder = TableSchema.builder();
+		List<TableColumn> tableColumns = tableSchema.getTableColumns();
+		for (int[] fieldPath : projectedFields) {
+			checkArgument(fieldPath.length == 1, "Nested projection push down is not supported yet.");
+			TableColumn column = tableColumns.get(fieldPath[0]);
+			schemaBuilder.field(column.getName(), column.getType());
+		}
+		return schemaBuilder.build();
+	}
+
+	/**
 	 * Returns true if there are any generated columns in the given {@link TableColumn}.
 	 */
 	public static boolean containsGeneratedColumns(TableSchema schema) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
index e96ddd9..3e760b1 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
@@ -70,4 +70,52 @@ public class TableSchemaUtilsTest {
 		exceptionRule.expectMessage("Constraint ct2 to drop does not exist");
 		TableSchemaUtils.dropConstraint(oriSchema, "ct2");
 	}
+
+	@Test
+	public void testInvalidProjectSchema() {
+		{
+			TableSchema schema = TableSchema.builder()
+				.field("a", DataTypes.INT().notNull())
+				.field("b", DataTypes.STRING())
+				.field("c", DataTypes.INT(), "a + 1")
+				.field("t", DataTypes.TIMESTAMP(3))
+				.primaryKey("ct1", new String[]{"a"})
+				.watermark("t", "t", DataTypes.TIMESTAMP(3))
+				.build();
+			exceptionRule.expect(IllegalArgumentException.class);
+			exceptionRule.expectMessage("It's illegal to project on a schema contains computed columns.");
+			int[][] projectedFields = {{1}};
+			TableSchemaUtils.projectSchema(schema, projectedFields);
+		}
+
+		{
+			TableSchema schema = TableSchema.builder()
+				.field("a", DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING())))
+				.field("b", DataTypes.STRING())
+				.build();
+			exceptionRule.expect(IllegalArgumentException.class);
+			exceptionRule.expectMessage("Nested projection push down is not supported yet.");
+			int[][] projectedFields = {{0, 1}};
+			TableSchemaUtils.projectSchema(schema, projectedFields);
+		}
+	}
+
+	@Test
+	public void testProjectSchema() {
+		TableSchema schema = TableSchema.builder()
+			.field("a", DataTypes.INT().notNull())
+			.field("b", DataTypes.STRING())
+			.field("t", DataTypes.TIMESTAMP(3))
+			.primaryKey("a")
+			.watermark("t", "t", DataTypes.TIMESTAMP(3))
+			.build();
+
+		int[][] projectedFields = {{2}, {0}};
+		TableSchema projected = TableSchemaUtils.projectSchema(schema, projectedFields);
+		TableSchema expected = TableSchema.builder()
+			.field("t", DataTypes.TIMESTAMP(3))
+			.field("a", DataTypes.INT().notNull())
+			.build();
+		assertEquals(expected, projected);
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
deleted file mode 100644
index c5367ea..0000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.factories;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.collection.Seq;
-
-/**
- * Test implementation of {@link DynamicTableSourceFactory} that supports projection push down.
- */
-public class TestProjectableValuesTableFactory implements DynamicTableSourceFactory {
-
-	// --------------------------------------------------------------------------------------------
-	// Data Registration
-	// --------------------------------------------------------------------------------------------
-
-	private static final AtomicInteger idCounter = new AtomicInteger(0);
-	private static final Map<String, Collection<Tuple2<RowKind, Row>>> registeredData = new HashMap<>();
-
-	/**
-	 * Register the given data into the data factory context and return the data id.
-	 * The data id can be used as a reference to the registered data in data connector DDL.
-	 */
-	public static String registerData(Collection<Row> data) {
-		List<Tuple2<RowKind, Row>> dataWithKinds = new ArrayList<>();
-		for (Row row : data) {
-			dataWithKinds.add(Tuple2.of(RowKind.INSERT, row));
-		}
-		return registerChangelogData(dataWithKinds);
-	}
-
-	/**
-	 * Register the given data into the data factory context and return the data id.
-	 * The data id can be used as a reference to the registered data in data connector DDL.
-	 */
-	public static String registerData(Seq<Row> data) {
-		return registerData(JavaScalaConversionUtil.toJava(data));
-	}
-
-	/**
-	 * Register the given data with RowKind into the data factory context and return the data id.
-	 * The data id can be used as a reference to the registered data in data connector DDL.
-	 * TODO: remove this utility once Row supports RowKind.
-	 */
-	public static String registerChangelogData(Collection<Tuple2<RowKind, Row>> data) {
-		String id = String.valueOf(idCounter.incrementAndGet());
-		registeredData.put(id, data);
-		return id;
-	}
-
-	/**
-	 * Removes the registered data under the given data id.
-	 */
-	public static void clearAllRegisteredData() {
-		registeredData.clear();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Factory
-	// --------------------------------------------------------------------------------------------
-
-	private static final String IDENTIFIER = "projectable-values";
-
-	private static final ConfigOption<String> DATA_ID = ConfigOptions
-			.key("data-id")
-			.stringType()
-			.defaultValue(null);
-
-	private static final ConfigOption<Boolean> BOUNDED = ConfigOptions
-			.key("bounded")
-			.booleanType()
-			.defaultValue(false);
-
-	private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions
-			.key("changelog-mode")
-			.stringType()
-			.defaultValue("I"); // all available "I,UA,UB,D"
-
-	private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions
-			.key("runtime-source")
-			.stringType()
-			.defaultValue("SourceFunction"); // another is "InputFormat"
-
-	private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions
-			.key("nested-projection-supported")
-			.booleanType()
-			.defaultValue(false);
-
-	@Override
-	public String factoryIdentifier() {
-		return IDENTIFIER;
-	}
-
-	@Override
-	public DynamicTableSource createDynamicTableSource(Context context) {
-		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-		helper.validate();
-		ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
-		String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
-		boolean isBounded = helper.getOptions().get(BOUNDED);
-		String dataId = helper.getOptions().get(DATA_ID);
-		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
-
-		Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList());
-		DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		return new TestProjectableValuesTableSource(
-				changelogMode,
-				isBounded,
-				runtimeSource,
-				rowDataType,
-				data,
-				nestedProjectionSupported);
-	}
-
-	@Override
-	public Set<ConfigOption<?>> requiredOptions() {
-		return Collections.emptySet();
-	}
-
-	@Override
-	public Set<ConfigOption<?>> optionalOptions() {
-		return new HashSet<>(Arrays.asList(
-				DATA_ID,
-				CHANGELOG_MODE,
-				BOUNDED,
-				RUNTIME_SOURCE,
-				NESTED_PROJECTION_SUPPORTED));
-	}
-
-	private ChangelogMode parseChangelogMode(String string) {
-		ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-		for (String split : string.split(",")) {
-			switch (split.trim()) {
-				case "I":
-					builder.addContainedKind(RowKind.INSERT);
-					break;
-				case "UB":
-					builder.addContainedKind(RowKind.UPDATE_BEFORE);
-					break;
-				case "UA":
-					builder.addContainedKind(RowKind.UPDATE_AFTER);
-					break;
-				case "D":
-					builder.addContainedKind(RowKind.DELETE);
-					break;
-				default:
-					throw new IllegalArgumentException("Invalid ChangelogMode string: " + string);
-			}
-		}
-		return builder.build();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Table source
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Values {@link DynamicTableSource} for testing.
-	 */
-	private static class TestProjectableValuesTableSource implements ScanTableSource, SupportsProjectionPushDown {
-
-		private final ChangelogMode changelogMode;
-		private final boolean bounded;
-		private final String runtimeSource;
-		private DataType physicalRowDataType;
-		private final Collection<Tuple2<RowKind, Row>> data;
-		private final boolean nestedProjectionSupported;
-		private int[] projectedFields = null;
-
-		private TestProjectableValuesTableSource(
-				ChangelogMode changelogMode,
-				boolean bounded, String runtimeSource,
-				DataType physicalRowDataType,
-				Collection<Tuple2<RowKind, Row>> data,
-				boolean nestedProjectionSupported) {
-			this.changelogMode = changelogMode;
-			this.bounded = bounded;
-			this.runtimeSource = runtimeSource;
-			this.physicalRowDataType = physicalRowDataType;
-			this.data = data;
-			this.nestedProjectionSupported = nestedProjectionSupported;
-		}
-
-		@Override
-		public ChangelogMode getChangelogMode() {
-			return changelogMode;
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
-			TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
-					.createTypeInformation(physicalRowDataType)
-					.createSerializer(new ExecutionConfig());
-			DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
-			Collection<RowData> values = convertToRowData(data, projectedFields, converter);
-
-			if (runtimeSource.equals("SourceFunction")) {
-				try {
-					return SourceFunctionProvider.of(
-							new FromElementsFunction<>(serializer, values),
-							bounded);
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
-			} else if (runtimeSource.equals("InputFormat")) {
-				return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
-			} else {
-				throw new IllegalArgumentException("Unsupported runtime source class: " + runtimeSource);
-			}
-		}
-
-		@Override
-		public DynamicTableSource copy() {
-			TestProjectableValuesTableSource newTableSource = new TestProjectableValuesTableSource(
-					changelogMode, bounded, runtimeSource, physicalRowDataType, data, nestedProjectionSupported);
-			newTableSource.projectedFields = projectedFields;
-			return newTableSource;
-		}
-
-		@Override
-		public String asSummaryString() {
-			return "TestProjectableValues";
-		}
-
-		private static Collection<RowData> convertToRowData(
-				Collection<Tuple2<RowKind, Row>> data,
-				@Nullable int[] projectedFields,
-				DataStructureConverter converter) {
-			List<RowData> result = new ArrayList<>();
-			for (Tuple2<RowKind, Row> value : data) {
-				Row projectedRow;
-				if (projectedFields == null) {
-					projectedRow = value.f1;
-				} else {
-					Object[] newValues = new Object[projectedFields.length];
-					for (int i = 0; i < projectedFields.length; ++i) {
-						newValues[i] = value.f1.getField(projectedFields[i]);
-					}
-					projectedRow = Row.of(newValues);
-				}
-				RowData rowData = (RowData) converter.toInternal(projectedRow);
-				if (rowData != null) {
-					rowData.setRowKind(value.f0);
-					result.add(rowData);
-				}
-			}
-			return result;
-		}
-
-		@Override
-		public boolean supportsNestedProjection() {
-			return nestedProjectionSupported;
-		}
-
-		@Override
-		public void applyProjection(int[][] projectedFields) {
-			this.projectedFields = new int[projectedFields.length];
-			FieldsDataType dataType = (FieldsDataType) physicalRowDataType;
-			RowType rowType = ((RowType) physicalRowDataType.getLogicalType());
-			DataTypes.Field[] fields = new DataTypes.Field[projectedFields.length];
-			for (int i = 0; i < projectedFields.length; ++i) {
-				int[] projection = projectedFields[i];
-				Preconditions.checkArgument(projection.length == 1);
-				int index = projection[0];
-				this.projectedFields[i] = index;
-				fields[i] = DataTypes.FIELD(rowType.getFieldNames().get(index), dataType.getChildren().get(index));
-			}
-			this.physicalRowDataType = DataTypes.ROW(fields);
-		}
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index e889814..47dcf42 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -55,7 +56,6 @@ import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.Retra
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -235,6 +235,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		.booleanType()
 		.defaultValue(true);
 
+	private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions
+		.key("nested-projection-supported")
+		.booleanType()
+		.defaultValue(false);
+
 	@Override
 	public String factoryIdentifier() {
 		return IDENTIFIER;
@@ -251,18 +256,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		String sourceClass = helper.getOptions().get(TABLE_SOURCE_CLASS);
 		boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
 		String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
+		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
 
 		if (sourceClass.equals("DEFAULT")) {
 			Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList());
-			DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+			TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 			return new TestValuesTableSource(
+				physicalSchema,
 				changelogMode,
 				isBounded,
 				runtimeSource,
-				rowDataType,
 				data,
 				isAsync,
-				lookupFunctionClass);
+				lookupFunctionClass,
+				nestedProjectionSupported,
+				null);
 		} else {
 			try {
 				return InstantiationUtil.instantiate(
@@ -306,7 +314,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 			ASYNC_ENABLED,
 			TABLE_SOURCE_CLASS,
 			SINK_INSERT_ONLY,
-			RUNTIME_SINK));
+			RUNTIME_SINK,
+			NESTED_PROJECTION_SUPPORTED));
 	}
 
 	private ChangelogMode parseChangelogMode(String string) {
@@ -339,30 +348,37 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
 
+		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
-		private final DataType physicalRowDataType;
 		private final Collection<Tuple2<RowKind, Row>> data;
 		private final boolean isAsync;
 		private final @Nullable String lookupFunctionClass;
+		private final boolean nestedProjectionSupported;
+		private @Nullable int[] projectedFields;
 
 		private TestValuesTableSource(
+				TableSchema physicalSchema,
 				ChangelogMode changelogMode,
-				boolean bounded, String runtimeSource,
-				DataType physicalRowDataType,
+				boolean bounded,
+				String runtimeSource,
 				Collection<Tuple2<RowKind, Row>> data,
 				boolean isAsync,
-				@Nullable String lookupFunctionClass) {
+				@Nullable String lookupFunctionClass,
+				boolean nestedProjectionSupported,
+				int[] projectedFields) {
+			this.physicalSchema = physicalSchema;
 			this.changelogMode = changelogMode;
 			this.bounded = bounded;
 			this.runtimeSource = runtimeSource;
-			this.physicalRowDataType = physicalRowDataType;
 			this.data = data;
 			this.isAsync = isAsync;
 			this.lookupFunctionClass = lookupFunctionClass;
+			this.nestedProjectionSupported = nestedProjectionSupported;
+			this.projectedFields = projectedFields;
 		}
 
 		@Override
@@ -374,11 +390,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		@Override
 		public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
 			TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
-				.createTypeInformation(physicalRowDataType)
+				.createTypeInformation(physicalSchema.toRowDataType())
 				.createSerializer(new ExecutionConfig());
-			DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
+			DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType());
 			converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
-			Collection<RowData> values = convertToRowData(data, converter);
+			Collection<RowData> values = convertToRowData(data, projectedFields, converter);
 
 			if (runtimeSource.equals("SourceFunction")) {
 				try {
@@ -438,8 +454,28 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		}
 
 		@Override
+		public boolean supportsNestedProjection() {
+			return nestedProjectionSupported;
+		}
+
+		@Override
+		public void applyProjection(int[][] projectedFields) {
+			this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
+		}
+
+		@Override
 		public DynamicTableSource copy() {
-			return new TestValuesTableSource(changelogMode, bounded, runtimeSource, physicalRowDataType, data, isAsync, lookupFunctionClass);
+			return new TestValuesTableSource(
+				physicalSchema,
+				changelogMode,
+				bounded,
+				runtimeSource,
+				data,
+				isAsync,
+				lookupFunctionClass,
+				nestedProjectionSupported,
+				projectedFields);
 		}
 
 		@Override
@@ -449,10 +485,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 
 		private static Collection<RowData> convertToRowData(
 				Collection<Tuple2<RowKind, Row>> data,
+				int[] projectedFields,
 				DataStructureConverter converter) {
 			List<RowData> result = new ArrayList<>();
 			for (Tuple2<RowKind, Row> value : data) {
-				RowData rowData = (RowData) converter.toInternal(value.f1);
+				Row projectedRow;
+				if (projectedFields == null) {
+					projectedRow = value.f1;
+				} else {
+					Object[] newValues = new Object[projectedFields.length];
+					for (int i = 0; i < projectedFields.length; ++i) {
+						newValues[i] = value.f1.getField(projectedFields[i]);
+					}
+					projectedRow = Row.of(newValues);
+				}
+				RowData rowData = (RowData) converter.toInternal(projectedRow);
 				if (rowData != null) {
 					rowData.setRowKind(value.f0);
 					result.add(rowData);
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index 955e3d4..b8a5b0b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -54,7 +54,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
 						"  b bigint,\n" +
 						"  c string\n" +
 						") WITH (\n" +
-						" 'connector' = 'projectable-values',\n" +
+						" 'connector' = 'values',\n" +
 						" 'bounded' = 'true'\n" +
 						")";
 		util().tableEnv().executeSql(ddl1);
@@ -66,7 +66,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
 						"  c string,\n" +
 						"  d as a + 1\n" +
 						") WITH (\n" +
-						" 'connector' = 'projectable-values',\n" +
+						" 'connector' = 'values',\n" +
 						" 'bounded' = 'true'\n" +
 						")";
 		util().tableEnv().executeSql(ddl2);
@@ -92,7 +92,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac
 						"  nested row<name string, `value` int>,\n" +
 						"  name string\n" +
 						") WITH (\n" +
-						" 'connector' = 'projectable-values',\n" +
+						" 'connector' = 'values',\n" +
 						" 'nested-projection-supported' = '" + nestedProjectionSupported + "',\n" +
 						"  'bounded' = 'true'\n" +
 						")";
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7632f4b..498fb98 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,5 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.table.planner.factories.TestValuesTableFactory
-org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
 org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 9c08f74..5e8c3fe 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -55,7 +55,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
 +- Exchange(distribution=[single], changelogMode=[I,UB,UA])
    +- Calc(select=[0 AS $f0], where=[>(a, 1)], changelogMode=[I,UB,UA])
-      +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[ts, a, b], changelogMode=[I,UB,UA])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a]]], fields=[a], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index 7b12fcf..9699a05 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase {
          |  b bigint,
          |  c varchar(32)
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'true'
          |)
        """.stripMargin
@@ -49,7 +49,7 @@ class TableSourceTest extends TableTestBase {
         |  nested row<name string, `value` int>,
         |  name string
         |) WITH (
-        | 'connector' = 'projectable-values',
+        | 'connector' = 'values',
         |  'bounded' = 'true'
         |)
         |""".stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 5f15680..b5f252b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -37,7 +37,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -57,7 +57,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
          |  pTime as PROCTIME(),
          |  watermark for pTime as pTime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -128,7 +128,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -148,7 +148,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -168,7 +168,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -188,7 +188,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -208,7 +208,7 @@ class TableSourceTest extends TableTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'bounded' = 'false'
          |)
@@ -235,7 +235,7 @@ class TableSourceTest extends TableTestBase {
          |  id int,
          |  name varchar(32)
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index 9d3ce05..0f0ead9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -39,7 +39,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -61,7 +61,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
          |  proctime as PROCTIME(),
          |  watermark for proctime as proctime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  proctime as PROCTIME()
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -132,7 +132,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -154,7 +154,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -175,7 +175,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -196,7 +196,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -217,7 +217,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'bounded' = 'false'
          |)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index dda3bbe..2d799d1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
 import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
 import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
-import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule
 import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil.parseFieldNames
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -42,7 +42,6 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
 import org.junit._
 
@@ -1250,7 +1249,7 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testSimpleProject(): Unit = {
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+    val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3)
     val ddl =
       s"""
          |CREATE TABLE SimpleTable (
@@ -1258,7 +1257,7 @@ class CalcITCase extends BatchTestBase {
          |  b bigint,
          |  c string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
          |)
@@ -1278,7 +1277,7 @@ class CalcITCase extends BatchTestBase {
       row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"),
       row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji")
     )
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+    val myTableDataId = TestValuesTableFactory.registerData(data)
     val ddl =
       s"""
          |CREATE TABLE NestedTable (
@@ -1288,7 +1287,7 @@ class CalcITCase extends BatchTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 7cc6e66..d32c9ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.data.{GenericRowData, RowData}
-import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo
@@ -290,7 +290,7 @@ class CalcITCase extends StreamingTestBase {
 
   @Test
   def testSimpleProject(): Unit = {
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+    val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3)
     val ddl =
       s"""
          |CREATE TABLE SimpleTable (
@@ -298,7 +298,7 @@ class CalcITCase extends StreamingTestBase {
          |  b bigint,
          |  c string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
          |)
@@ -321,7 +321,7 @@ class CalcITCase extends StreamingTestBase {
       row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"),
       row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji")
     )
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+    val myTableDataId = TestValuesTableFactory.registerData(data)
     val ddl =
       s"""
          |CREATE TABLE NestedTable (
@@ -331,7 +331,7 @@ class CalcITCase extends StreamingTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 5b59ffb..403a240 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.writer.BinaryRowWriter
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
@@ -81,7 +81,6 @@ class BatchTestBase extends BatchAbstractTestBase {
   @After
   def after(): Unit = {
     TestValuesTableFactory.clearAllData()
-    TestProjectableValuesTableFactory.clearAllRegisteredData()
   }
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 3b6bbe4..3fffbc0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.StreamTableEnvironment
-import org.apache.flink.table.api.{EnvironmentSettings, ImplicitExpressionConversions}
-import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory}
+import org.apache.flink.table.api.ImplicitExpressionConversions
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.api.{EnvironmentSettings, Table}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
@@ -62,7 +62,6 @@ class StreamingTestBase extends AbstractTestBase {
   def after(): Unit = {
     StreamTestSink.clear()
     TestValuesTableFactory.clearAllData()
-    TestProjectableValuesTableFactory.clearAllRegisteredData()
   }
 
   /**


[flink] 03/03: [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 73520ca19e76d0895c38ec956250cb588eca740c
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:48:25 2020 +0800

    [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source
    
    This closes #12221
---
 flink-connectors/flink-connector-jdbc/pom.xml      |  2 +-
 .../jdbc/table/JdbcDynamicTableSource.java         | 48 ++++++++++---------
 .../table/JdbcDynamicTableSourceSinkFactory.java   |  7 +--
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   | 45 ++++++++++--------
 ...ctionITCase.java => JdbcLookupTableITCase.java} | 43 +++++++++--------
 .../connector/jdbc/table/JdbcTablePlanTest.java    | 54 ++++++++++++++++++++++
 .../jdbc/table/JdbcTableSourceITCase.java          |  1 -
 .../connector/jdbc/table/JdbcTablePlanTest.xml     | 35 ++++++++++++++
 8 files changed, 168 insertions(+), 67 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml
index d259ce3..be107af 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -94,7 +94,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 248ffe1..21a80a2 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -32,37 +32,35 @@ import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A {@link DynamicTableSource} for JDBC.
  */
 @Internal
-public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource {
+public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
 
 	private final JdbcOptions options;
 	private final JdbcReadOptions readOptions;
 	private final JdbcLookupOptions lookupOptions;
-	private final TableSchema schema;
-	private final int[] selectFields;
+	private TableSchema physicalSchema;
 	private final String dialectName;
 
 	public JdbcDynamicTableSource(
 			JdbcOptions options,
 			JdbcReadOptions readOptions,
 			JdbcLookupOptions lookupOptions,
-			TableSchema schema,
-			int[] selectFields) {
+			TableSchema physicalSchema) {
 		this.options = options;
 		this.readOptions = readOptions;
 		this.lookupOptions = lookupOptions;
-		this.schema = schema;
-		this.selectFields = selectFields;
+		this.physicalSchema = physicalSchema;
 		this.dialectName = options.getDialect().dialectName();
 	}
 
@@ -74,15 +72,15 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 			int[] innerKeyArr = context.getKeys()[i];
 			Preconditions.checkArgument(innerKeyArr.length == 1,
 				"JDBC only support non-nested look up keys");
-			keyNames[i] = schema.getFieldNames()[innerKeyArr[0]];
+			keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
 		}
-		final RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
+		final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
 
 		return TableFunctionProvider.of(new JdbcRowDataLookupFunction(
 			options,
 			lookupOptions,
-			schema.getFieldNames(),
-			schema.getFieldDataTypes(),
+			physicalSchema.getFieldNames(),
+			physicalSchema.getFieldDataTypes(),
 			keyNames,
 			rowType));
 	}
@@ -101,7 +99,7 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 		}
 		final JdbcDialect dialect = options.getDialect();
 		String query = dialect.getSelectFromStatement(
-			options.getTableName(), schema.getFieldNames(), new String[0]);
+			options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
 		if (readOptions.getPartitionColumnName().isPresent()) {
 			long lowerBound = readOptions.getPartitionLowerBound().get();
 			long upperBound = readOptions.getPartitionUpperBound().get();
@@ -113,10 +111,10 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 				" BETWEEN ? AND ?";
 		}
 		builder.setQuery(query);
-		final RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
+		final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
 		builder.setRowConverter(dialect.getRowConverter(rowType));
 		builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
-			.createTypeInformation(schema.toRowDataType()));
+			.createTypeInformation(physicalSchema.toRowDataType()));
 
 		return InputFormatProvider.of(builder.build());
 	}
@@ -127,8 +125,19 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 	}
 
 	@Override
+	public boolean supportsNestedProjection() {
+		// JDBC doesn't support nested projection
+		return false;
+	}
+
+	@Override
+	public void applyProjection(int[][] projectedFields) {
+		this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+	}
+
+	@Override
 	public DynamicTableSource copy() {
-		return new JdbcDynamicTableSource(options, readOptions, lookupOptions, schema, selectFields);
+		return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
 	}
 
 	@Override
@@ -148,15 +157,12 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 		return Objects.equals(options, that.options) &&
 			Objects.equals(readOptions, that.readOptions) &&
 			Objects.equals(lookupOptions, that.lookupOptions) &&
-			Objects.equals(schema, that.schema) &&
-			Arrays.equals(selectFields, that.selectFields) &&
+			Objects.equals(physicalSchema, that.physicalSchema) &&
 			Objects.equals(dialectName, that.dialectName);
 	}
 
 	@Override
 	public int hashCode() {
-		int result = Objects.hash(options, readOptions, lookupOptions, schema, dialectName);
-		result = 31 * result + Arrays.hashCode(selectFields);
-		return result;
+		return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName);
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
index 28a129d..930a1b0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
@@ -173,16 +173,11 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
 		helper.validate();
 		validateConfigOptions(config);
 		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-		int[] selectFields = new int[physicalSchema.getFieldNames().length];
-		for (int i = 0; i < selectFields.length; i++) {
-			selectFields[i] = i;
-		}
 		return new JdbcDynamicTableSource(
 			getJdbcOptions(helper.getOptions()),
 			getJdbcReadOptions(helper.getOptions()),
 			getJdbcLookupOptions(helper.getOptions()),
-			physicalSchema,
-			selectFields);
+			physicalSchema);
 	}
 
 	private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index 6f93307..48be89e 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -22,10 +22,12 @@ import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,8 +36,12 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
 
 /**
  * ITCase for {@link JdbcDynamicTableSource}.
@@ -79,6 +85,7 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
 			Statement stat = conn.createStatement()) {
 			stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
 		}
+		StreamTestSink.clear();
 	}
 
 	@Test
@@ -106,16 +113,17 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
 				")"
 		);
 
-		StreamITCase.clear();
-		tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE), Row.class)
-			.addSink(new StreamITCase.StringSink<>());
-		env.execute();
-
+		Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 		List<String> expected =
-			Arrays.asList(
+			Stream.of(
 				"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
-				"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
-		StreamITCase.compareWithList(expected);
+				"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234")
+			.sorted().collect(Collectors.toList());
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -147,15 +155,16 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
 				")"
 		);
 
-		StreamITCase.clear();
-		tEnv.toAppendStream(tEnv.sqlQuery("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE), Row.class)
-			.addSink(new StreamITCase.StringSink<>());
-		env.execute();
-
+		Iterator<Row> collected = tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE).collect();
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 		List<String> expected =
-			Arrays.asList(
+			Stream.of(
 				"1,2020-01-01T15:35:00.123456,100.1234",
-				"2,2020-01-01T15:36:01.123456,101.1234");
-		StreamITCase.compareWithList(expected);
+				"2,2020-01-01T15:36:01.123456,101.1234")
+				.sorted().collect(Collectors.toList());
+		assertEquals(expected, result);
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
similarity index 86%
rename from flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
rename to flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8d40cdd..793ea9d 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -22,17 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,16 +46,20 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertEquals;
 
 /**
- * IT case for {@link JdbcLookupFunction}.
+ * IT case for lookup source of JDBC connector.
  */
 @RunWith(Parameterized.class)
-public class JdbcLookupFunctionITCase extends AbstractTestBase {
+public class JdbcLookupTableITCase extends AbstractTestBase {
 
 	public static final String DB_URL = "jdbc:derby:memory:lookup";
 	public static final String LOOKUP_TABLE = "lookup_table";
@@ -63,7 +67,7 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 	private final String tableFactory;
 	private final boolean useCache;
 
-	public JdbcLookupFunctionITCase(String tableFactory, boolean useCache) {
+	public JdbcLookupTableITCase(String tableFactory, boolean useCache) {
 		this.useCache = useCache;
 		this.tableFactory = tableFactory;
 	}
@@ -143,16 +147,20 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 	}
 
 	@Test
-	public void test() throws Exception {
+	public void testLookup() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-		StreamITCase.clear();
 
+		Iterator<Row> collected;
 		if ("legacyFactory".equals(tableFactory)) {
-			useLegacyTableFactory(env, tEnv);
+			collected = useLegacyTableFactory(env, tEnv);
 		} else {
-			useDynamicTableFactory(env, tEnv);
+			collected = useDynamicTableFactory(env, tEnv);
 		}
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 
 		List<String> expected = new ArrayList<>();
 		expected.add("1,1,11-c1-v1,11-c2-v1");
@@ -162,11 +170,12 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 		expected.add("2,3,null,23-c2");
 		expected.add("2,5,25-c1,25-c2");
 		expected.add("3,8,38-c1,38-c2");
+		Collections.sort(expected);
 
-		StreamITCase.compareWithList(expected);
+		assertEquals(expected, result);
 	}
 
-	private void useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
+	private Iterator<Row> useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
 		Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
 			new Tuple2<>(1, "1"),
 			new Tuple2<>(1, "1"),
@@ -195,13 +204,10 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 
 		String sqlQuery = "SELECT id1, id2, comment1, comment2 FROM T, " +
 			"LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, l_id2, comment1, comment2)";
-		Table result = tEnv.sqlQuery(sqlQuery);
-		DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink<>());
-		env.execute();
+		return tEnv.executeSql(sqlQuery).collect();
 	}
 
-	private void useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
+	private Iterator<Row> useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
 		Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
 			new Tuple2<>(1, "1"),
 			new Tuple2<>(1, "1"),
@@ -229,9 +235,6 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 		String sqlQuery = "SELECT source.id1, source.id2, L.comment1, L.comment2 FROM T AS source " +
 			"JOIN lookup for system_time as of source.proctime AS L " +
 			"ON source.id1 = L.id1 and source.id2 = L.id2";
-		Table result = tEnv.sqlQuery(sqlQuery);
-		DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink<>());
-		env.execute();
+		return tEnv.executeSql(sqlQuery).collect();
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
new file mode 100644
index 0000000..4efcb47
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/**
+ * Plan tests for JDBC connector, for example, testing projection push down.
+ */
+public class JdbcTablePlanTest extends TableTestBase {
+
+	private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+	@Test
+	public void testProjectionPushDown() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE jdbc (" +
+				"id BIGINT," +
+				"timestamp6_col TIMESTAMP(6)," +
+				"timestamp9_col TIMESTAMP(9)," +
+				"time_col TIME," +
+				"real_col FLOAT," +
+				"double_col DOUBLE," +
+				"decimal_col DECIMAL(10, 4)" +
+				") WITH (" +
+				"  'connector'='jdbc'," +
+				"  'url'='jdbc:derby:memory:test'," +
+				"  'table-name'='test_table'" +
+				")"
+		);
+		util.verifyPlan("SELECT decimal_col, timestamp9_col, id FROM jdbc");
+	}
+
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 277191c..8115696 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -44,7 +44,6 @@ import java.util.stream.StreamSupport;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-
 /**
  * ITCase for {@link JdbcTableSource}.
  */
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
new file mode 100644
index 0000000..9219fc8
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, timestamp9_col, id])
+]]>
+    </Resource>
+  </TestCase>
+</Root>


[flink] 02/03: [FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57e4748614bdbe06769b147bc264e4a400784379
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:48:08 2020 +0800

    [FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source
    
    This closes #12221
---
 flink-connectors/flink-connector-hbase/pom.xml     |   8 ++
 .../connector/hbase/HBaseDynamicTableFactory.java  |  43 ++++++-
 .../hbase/source/HBaseDynamicTableSource.java      |  21 +++-
 .../flink/connector/hbase/util/HBaseSerde.java     |  24 ++--
 .../connector/hbase/util/HBaseTableSchema.java     |  12 --
 .../flink/connector/hbase/HBaseTablePlanTest.java  | 127 +++++++++++++++++++++
 .../flink/connector/hbase/HBaseTablePlanTest.xml   |  36 ++++++
 7 files changed, 242 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/pom.xml b/flink-connectors/flink-connector-hbase/pom.xml
index d3115b5..b2e0b4a 100644
--- a/flink-connectors/flink-connector-hbase/pom.xml
+++ b/flink-connectors/flink-connector-hbase/pom.xml
@@ -207,6 +207,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index ba85577..64b381f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -103,15 +103,15 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 	public DynamicTableSource createDynamicTableSource(Context context) {
 		TableFactoryHelper helper = createTableFactoryHelper(this, context);
 		helper.validate();
+		TableSchema tableSchema = context.getCatalogTable().getSchema();
+		validatePrimaryKey(tableSchema);
+
 		String hTableName = helper.getOptions().get(TABLE_NAME);
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
 		Configuration hbaseClientConf = HBaseConfiguration.create();
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM));
 		hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
-
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
-
-		TableSchema tableSchema = context.getCatalogTable().getSchema();
 		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
 
 		return new HBaseDynamicTableSource(
@@ -125,6 +125,9 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 	public DynamicTableSink createDynamicTableSink(Context context) {
 		TableFactoryHelper helper = createTableFactoryHelper(this, context);
 		helper.validate();
+		TableSchema tableSchema = context.getCatalogTable().getSchema();
+		validatePrimaryKey(tableSchema);
+
 		HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
 		hbaseOptionsBuilder.setTableName(helper.getOptions().get(TABLE_NAME));
 		hbaseOptionsBuilder.setZkQuorum(helper.getOptions().get(ZOOKEEPER_QUORUM));
@@ -136,10 +139,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 			.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
 		helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
 			.ifPresent(writeBuilder::setBufferFlushMaxRows);
-
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
-
-		TableSchema tableSchema = context.getCatalogTable().getSchema();
 		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
 
 		return new HBaseDynamicTableSink(
@@ -172,4 +172,35 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 		set.add(SINK_BUFFER_FLUSH_INTERVAL);
 		return set;
 	}
+
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Checks that the HBase table have row key defined. A row key is defined as an atomic type,
+	 * and column families and qualifiers are defined as ROW type. There shouldn't be multiple
+	 * atomic type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the
+	 * primary key constraint must be defined on the single row key field.
+	 */
+	private static void validatePrimaryKey(TableSchema schema) {
+		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
+		if (!hbaseSchema.getRowKeyName().isPresent()) {
+			throw new IllegalArgumentException(
+				"HBase table requires to define a row key field. " +
+					"A row key field is defined as an atomic type, " +
+					"column families and qualifiers are defined as ROW type.");
+		}
+		schema.getPrimaryKey().ifPresent(k -> {
+			if (k.getColumns().size() > 1) {
+				throw new IllegalArgumentException(
+					"HBase table doesn't support a primary Key on multiple columns. " +
+						"The primary key of HBase table must be defined on row key field.");
+			}
+			if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
+				throw new IllegalArgumentException(
+					"Primary key of HBase table must be defined on the row key field. " +
+						"A row key field is defined as an atomic type, " +
+						"column families and qualifiers are defined as ROW type.");
+			}
+		});
+	}
 }
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index e59a12e..dcc5a5b 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -21,12 +21,15 @@ package org.apache.flink.connector.hbase.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.utils.TableSchemaUtils;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -36,11 +39,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * HBase table source implementation.
  */
 @Internal
-public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource {
+public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
 
 	private final Configuration conf;
 	private final String tableName;
-	private final HBaseTableSchema hbaseSchema;
+	private HBaseTableSchema hbaseSchema;
 	private final String nullStringLiteral;
 
 	public HBaseDynamicTableSource(
@@ -78,6 +81,20 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour
 	}
 
 	@Override
+	public boolean supportsNestedProjection() {
+		// planner doesn't support nested projection push down yet.
+		return false;
+	}
+
+	@Override
+	public void applyProjection(int[][] projectedFields) {
+		TableSchema projectSchema = TableSchemaUtils.projectSchema(
+			hbaseSchema.convertsToTableSchema(),
+			projectedFields);
+		this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema);
+	}
+
+	@Override
 	public ChangelogMode getChangelogMode() {
 		return ChangelogMode.insertOnly();
 	}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index ed4a11f..e5a377f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -67,8 +67,8 @@ public class HBaseSerde {
 	private GenericRowData reusedRow;
 	private GenericRowData[] reusedFamilyRows;
 
-	private final FieldEncoder keyEncoder;
-	private final FieldDecoder keyDecoder;
+	private final @Nullable FieldEncoder keyEncoder;
+	private final @Nullable FieldDecoder keyDecoder;
 	private final FieldEncoder[][] qualifierEncoders;
 	private final FieldDecoder[][] qualifierDecoders;
 
@@ -78,18 +78,21 @@ public class HBaseSerde {
 		LogicalType rowkeyType = hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
 
 		// field length need take row key into account if it exists.
-		checkArgument(rowkeyIndex != -1 && rowkeyType != null, "row key is not set.");
-		this.fieldLength = families.length + 1;
+		if (rowkeyIndex != -1 && rowkeyType != null) {
+			this.fieldLength = families.length + 1;
+			this.keyEncoder = createFieldEncoder(rowkeyType);
+			this.keyDecoder = createFieldDecoder(rowkeyType);
+		} else {
+			this.fieldLength = families.length;
+			this.keyEncoder = null;
+			this.keyDecoder = null;
+		}
 		this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
 
 		// prepare output rows
 		this.reusedRow = new GenericRowData(fieldLength);
 		this.reusedFamilyRows = new GenericRowData[families.length];
 
-		// row key should never be null
-		this.keyEncoder = createFieldEncoder(rowkeyType);
-		this.keyDecoder = createFieldDecoder(rowkeyType);
-
 		this.qualifiers = new byte[families.length][][];
 		this.qualifierEncoders = new FieldEncoder[families.length][];
 		this.qualifierDecoders = new FieldDecoder[families.length][];
@@ -115,6 +118,7 @@ public class HBaseSerde {
 	 * @return The appropriate instance of Put for this use case.
 	 */
 	public @Nullable Put createPutMutation(RowData row) {
+		checkArgument(keyEncoder != null, "row key is not set.");
 		byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
 		if (rowkey.length == 0) {
 			// drop dirty records, rowkey shouldn't be zero length
@@ -146,6 +150,7 @@ public class HBaseSerde {
 	 * @return The appropriate instance of Delete for this use case.
 	 */
 	public @Nullable Delete createDeleteMutation(RowData row) {
+		checkArgument(keyEncoder != null, "row key is not set.");
 		byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
 		if (rowkey.length == 0) {
 			// drop dirty records, rowkey shouldn't be zero length
@@ -189,9 +194,10 @@ public class HBaseSerde {
 	 * Converts HBase {@link Result} into {@link RowData}.
 	 */
 	public RowData convertToRow(Result result) {
-		Object rowkey = keyDecoder.decode(result.getRow());
 		for (int i = 0; i < fieldLength; i++) {
 			if (rowkeyIndex == i) {
+				assert keyDecoder != null;
+				Object rowkey = keyDecoder.decode(result.getRow());
 				reusedRow.setField(rowkeyIndex, rowkey);
 			} else {
 				int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index 41108f4..116b1ae 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -361,18 +361,6 @@ public class HBaseTableSchema implements Serializable {
 					"Unsupported field type '" + fieldType + "' for HBase.");
 			}
 		}
-		schema.getPrimaryKey().ifPresent(k -> {
-			if (k.getColumns().size() > 1 ||
-					!hbaseSchema.getRowKeyName().isPresent() ||
-					!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
-				throw new IllegalArgumentException(
-					"Primary Key of HBase table should only be defined on the row key field.");
-			}
-		});
-		if (!hbaseSchema.getRowKeyName().isPresent()) {
-			throw new IllegalArgumentException(
-				"HBase table requires to define a row key field. A row key field must be an atomic type.");
-		}
 		return hbaseSchema;
 	}
 
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
new file mode 100644
index 0000000..053cf99
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+
+/**
+ * Plan tests for HBase connector, for example, testing projection push down.
+ */
+public class HBaseTablePlanTest extends TableTestBase {
+
+	private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+	@Test
+	public void testMultipleRowKey() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" rowkey INT," +
+				" rowkey2 STRING " +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException("Row key can't be set multiple times.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testNoneRowKey() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>" +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException(
+			"HBase table requires to define a row key field. " +
+				"A row key field is defined as an atomic type, " +
+				"column families and qualifiers are defined as ROW type.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testInvalidPrimaryKey() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" rowkey STRING, " +
+				" PRIMARY KEY (family1) NOT ENFORCED " +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException(
+			"Primary key of HBase table must be defined on the row key field. " +
+				"A row key field is defined as an atomic type, " +
+				"column families and qualifiers are defined as ROW type.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testUnsupportedDataType() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" col1 ARRAY<STRING>, " +
+				" rowkey STRING, " +
+				" PRIMARY KEY (rowkey) NOT ENFORCED " +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException(
+			"Unsupported field type 'ARRAY<STRING>' for HBase.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testProjectionPushDown() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," +
+				" rowkey INT," +
+				" PRIMARY KEY (rowkey) NOT ENFORCED" +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		util.verifyPlan("SELECT h.family3, h.family2.col2 FROM hTable AS h");
+	}
+
+}
diff --git a/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
new file mode 100644
index 0000000..8391b1b
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT h.family3, h.family2.col2 FROM hTable AS h]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(family3=[$2], col2=[$1.col2])
++- LogicalTableScan(table=[[default_catalog, default_database, hTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[family3, family2.col2 AS col2])
++- TableSourceScan(table=[[default_catalog, default_database, hTable, project=[family3, family2]]], fields=[family3, family2])
+]]>
+    </Resource>
+  </TestCase>
+</Root>