You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:52 UTC

[flink] 03/03: [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 73520ca19e76d0895c38ec956250cb588eca740c
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:48:25 2020 +0800

    [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source
    
    This closes #12221
---
 flink-connectors/flink-connector-jdbc/pom.xml      |  2 +-
 .../jdbc/table/JdbcDynamicTableSource.java         | 48 ++++++++++---------
 .../table/JdbcDynamicTableSourceSinkFactory.java   |  7 +--
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   | 45 ++++++++++--------
 ...ctionITCase.java => JdbcLookupTableITCase.java} | 43 +++++++++--------
 .../connector/jdbc/table/JdbcTablePlanTest.java    | 54 ++++++++++++++++++++++
 .../jdbc/table/JdbcTableSourceITCase.java          |  1 -
 .../connector/jdbc/table/JdbcTablePlanTest.xml     | 35 ++++++++++++++
 8 files changed, 168 insertions(+), 67 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml
index d259ce3..be107af 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -94,7 +94,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 248ffe1..21a80a2 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -32,37 +32,35 @@ import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A {@link DynamicTableSource} for JDBC.
  */
 @Internal
-public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource {
+public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
 
 	private final JdbcOptions options;
 	private final JdbcReadOptions readOptions;
 	private final JdbcLookupOptions lookupOptions;
-	private final TableSchema schema;
-	private final int[] selectFields;
+	private TableSchema physicalSchema;
 	private final String dialectName;
 
 	public JdbcDynamicTableSource(
 			JdbcOptions options,
 			JdbcReadOptions readOptions,
 			JdbcLookupOptions lookupOptions,
-			TableSchema schema,
-			int[] selectFields) {
+			TableSchema physicalSchema) {
 		this.options = options;
 		this.readOptions = readOptions;
 		this.lookupOptions = lookupOptions;
-		this.schema = schema;
-		this.selectFields = selectFields;
+		this.physicalSchema = physicalSchema;
 		this.dialectName = options.getDialect().dialectName();
 	}
 
@@ -74,15 +72,15 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 			int[] innerKeyArr = context.getKeys()[i];
 			Preconditions.checkArgument(innerKeyArr.length == 1,
 				"JDBC only support non-nested look up keys");
-			keyNames[i] = schema.getFieldNames()[innerKeyArr[0]];
+			keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
 		}
-		final RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
+		final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
 
 		return TableFunctionProvider.of(new JdbcRowDataLookupFunction(
 			options,
 			lookupOptions,
-			schema.getFieldNames(),
-			schema.getFieldDataTypes(),
+			physicalSchema.getFieldNames(),
+			physicalSchema.getFieldDataTypes(),
 			keyNames,
 			rowType));
 	}
@@ -101,7 +99,7 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 		}
 		final JdbcDialect dialect = options.getDialect();
 		String query = dialect.getSelectFromStatement(
-			options.getTableName(), schema.getFieldNames(), new String[0]);
+			options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
 		if (readOptions.getPartitionColumnName().isPresent()) {
 			long lowerBound = readOptions.getPartitionLowerBound().get();
 			long upperBound = readOptions.getPartitionUpperBound().get();
@@ -113,10 +111,10 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 				" BETWEEN ? AND ?";
 		}
 		builder.setQuery(query);
-		final RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
+		final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
 		builder.setRowConverter(dialect.getRowConverter(rowType));
 		builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
-			.createTypeInformation(schema.toRowDataType()));
+			.createTypeInformation(physicalSchema.toRowDataType()));
 
 		return InputFormatProvider.of(builder.build());
 	}
@@ -127,8 +125,19 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 	}
 
 	@Override
+	public boolean supportsNestedProjection() {
+		// JDBC doesn't support nested projection
+		return false;
+	}
+
+	@Override
+	public void applyProjection(int[][] projectedFields) {
+		this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+	}
+
+	@Override
 	public DynamicTableSource copy() {
-		return new JdbcDynamicTableSource(options, readOptions, lookupOptions, schema, selectFields);
+		return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
 	}
 
 	@Override
@@ -148,15 +157,12 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 		return Objects.equals(options, that.options) &&
 			Objects.equals(readOptions, that.readOptions) &&
 			Objects.equals(lookupOptions, that.lookupOptions) &&
-			Objects.equals(schema, that.schema) &&
-			Arrays.equals(selectFields, that.selectFields) &&
+			Objects.equals(physicalSchema, that.physicalSchema) &&
 			Objects.equals(dialectName, that.dialectName);
 	}
 
 	@Override
 	public int hashCode() {
-		int result = Objects.hash(options, readOptions, lookupOptions, schema, dialectName);
-		result = 31 * result + Arrays.hashCode(selectFields);
-		return result;
+		return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName);
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
index 28a129d..930a1b0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
@@ -173,16 +173,11 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
 		helper.validate();
 		validateConfigOptions(config);
 		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-		int[] selectFields = new int[physicalSchema.getFieldNames().length];
-		for (int i = 0; i < selectFields.length; i++) {
-			selectFields[i] = i;
-		}
 		return new JdbcDynamicTableSource(
 			getJdbcOptions(helper.getOptions()),
 			getJdbcReadOptions(helper.getOptions()),
 			getJdbcLookupOptions(helper.getOptions()),
-			physicalSchema,
-			selectFields);
+			physicalSchema);
 	}
 
 	private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index 6f93307..48be89e 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -22,10 +22,12 @@ import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,8 +36,12 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
 
 /**
  * ITCase for {@link JdbcDynamicTableSource}.
@@ -79,6 +85,7 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
 			Statement stat = conn.createStatement()) {
 			stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
 		}
+		StreamTestSink.clear();
 	}
 
 	@Test
@@ -106,16 +113,17 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
 				")"
 		);
 
-		StreamITCase.clear();
-		tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE), Row.class)
-			.addSink(new StreamITCase.StringSink<>());
-		env.execute();
-
+		Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 		List<String> expected =
-			Arrays.asList(
+			Stream.of(
 				"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
-				"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
-		StreamITCase.compareWithList(expected);
+				"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234")
+			.sorted().collect(Collectors.toList());
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -147,15 +155,16 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
 				")"
 		);
 
-		StreamITCase.clear();
-		tEnv.toAppendStream(tEnv.sqlQuery("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE), Row.class)
-			.addSink(new StreamITCase.StringSink<>());
-		env.execute();
-
+		Iterator<Row> collected = tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE).collect();
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 		List<String> expected =
-			Arrays.asList(
+			Stream.of(
 				"1,2020-01-01T15:35:00.123456,100.1234",
-				"2,2020-01-01T15:36:01.123456,101.1234");
-		StreamITCase.compareWithList(expected);
+				"2,2020-01-01T15:36:01.123456,101.1234")
+				.sorted().collect(Collectors.toList());
+		assertEquals(expected, result);
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
similarity index 86%
rename from flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
rename to flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8d40cdd..793ea9d 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -22,17 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,16 +46,20 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertEquals;
 
 /**
- * IT case for {@link JdbcLookupFunction}.
+ * IT case for lookup source of JDBC connector.
  */
 @RunWith(Parameterized.class)
-public class JdbcLookupFunctionITCase extends AbstractTestBase {
+public class JdbcLookupTableITCase extends AbstractTestBase {
 
 	public static final String DB_URL = "jdbc:derby:memory:lookup";
 	public static final String LOOKUP_TABLE = "lookup_table";
@@ -63,7 +67,7 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 	private final String tableFactory;
 	private final boolean useCache;
 
-	public JdbcLookupFunctionITCase(String tableFactory, boolean useCache) {
+	public JdbcLookupTableITCase(String tableFactory, boolean useCache) {
 		this.useCache = useCache;
 		this.tableFactory = tableFactory;
 	}
@@ -143,16 +147,20 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 	}
 
 	@Test
-	public void test() throws Exception {
+	public void testLookup() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-		StreamITCase.clear();
 
+		Iterator<Row> collected;
 		if ("legacyFactory".equals(tableFactory)) {
-			useLegacyTableFactory(env, tEnv);
+			collected = useLegacyTableFactory(env, tEnv);
 		} else {
-			useDynamicTableFactory(env, tEnv);
+			collected = useDynamicTableFactory(env, tEnv);
 		}
+		List<String> result = Lists.newArrayList(collected).stream()
+			.map(Row::toString)
+			.sorted()
+			.collect(Collectors.toList());
 
 		List<String> expected = new ArrayList<>();
 		expected.add("1,1,11-c1-v1,11-c2-v1");
@@ -162,11 +170,12 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 		expected.add("2,3,null,23-c2");
 		expected.add("2,5,25-c1,25-c2");
 		expected.add("3,8,38-c1,38-c2");
+		Collections.sort(expected);
 
-		StreamITCase.compareWithList(expected);
+		assertEquals(expected, result);
 	}
 
-	private void useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
+	private Iterator<Row> useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
 		Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
 			new Tuple2<>(1, "1"),
 			new Tuple2<>(1, "1"),
@@ -195,13 +204,10 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 
 		String sqlQuery = "SELECT id1, id2, comment1, comment2 FROM T, " +
 			"LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, l_id2, comment1, comment2)";
-		Table result = tEnv.sqlQuery(sqlQuery);
-		DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink<>());
-		env.execute();
+		return tEnv.executeSql(sqlQuery).collect();
 	}
 
-	private void useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
+	private Iterator<Row> useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
 		Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
 			new Tuple2<>(1, "1"),
 			new Tuple2<>(1, "1"),
@@ -229,9 +235,6 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
 		String sqlQuery = "SELECT source.id1, source.id2, L.comment1, L.comment2 FROM T AS source " +
 			"JOIN lookup for system_time as of source.proctime AS L " +
 			"ON source.id1 = L.id1 and source.id2 = L.id2";
-		Table result = tEnv.sqlQuery(sqlQuery);
-		DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink<>());
-		env.execute();
+		return tEnv.executeSql(sqlQuery).collect();
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
new file mode 100644
index 0000000..4efcb47
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/**
+ * Plan tests for JDBC connector, for example, testing projection push down.
+ */
+public class JdbcTablePlanTest extends TableTestBase {
+
+	private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+	@Test
+	public void testProjectionPushDown() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE jdbc (" +
+				"id BIGINT," +
+				"timestamp6_col TIMESTAMP(6)," +
+				"timestamp9_col TIMESTAMP(9)," +
+				"time_col TIME," +
+				"real_col FLOAT," +
+				"double_col DOUBLE," +
+				"decimal_col DECIMAL(10, 4)" +
+				") WITH (" +
+				"  'connector'='jdbc'," +
+				"  'url'='jdbc:derby:memory:test'," +
+				"  'table-name'='test_table'" +
+				")"
+		);
+		util.verifyPlan("SELECT decimal_col, timestamp9_col, id FROM jdbc");
+	}
+
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 277191c..8115696 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -44,7 +44,6 @@ import java.util.stream.StreamSupport;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-
 /**
  * ITCase for {@link JdbcTableSource}.
  */
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
new file mode 100644
index 0000000..9219fc8
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, timestamp9_col, id])
+]]>
+    </Resource>
+  </TestCase>
+</Root>