You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:52 UTC
[flink] 03/03: [FLINK-17798][connector/jdbc] Align the behavior
between the new and legacy JDBC table source
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 73520ca19e76d0895c38ec956250cb588eca740c
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:48:25 2020 +0800
[FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source
This closes #12221
---
flink-connectors/flink-connector-jdbc/pom.xml | 2 +-
.../jdbc/table/JdbcDynamicTableSource.java | 48 ++++++++++---------
.../table/JdbcDynamicTableSourceSinkFactory.java | 7 +--
.../jdbc/table/JdbcDynamicTableSourceITCase.java | 45 ++++++++++--------
...ctionITCase.java => JdbcLookupTableITCase.java} | 43 +++++++++--------
.../connector/jdbc/table/JdbcTablePlanTest.java | 54 ++++++++++++++++++++++
.../jdbc/table/JdbcTableSourceITCase.java | 1 -
.../connector/jdbc/table/JdbcTablePlanTest.xml | 35 ++++++++++++++
8 files changed, 168 insertions(+), 67 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml
index d259ce3..be107af 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -94,7 +94,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 248ffe1..21a80a2 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -32,37 +32,35 @@ import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
-import java.util.Arrays;
import java.util.Objects;
/**
* A {@link DynamicTableSource} for JDBC.
*/
@Internal
-public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource {
+public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
private final JdbcOptions options;
private final JdbcReadOptions readOptions;
private final JdbcLookupOptions lookupOptions;
- private final TableSchema schema;
- private final int[] selectFields;
+ private TableSchema physicalSchema;
private final String dialectName;
public JdbcDynamicTableSource(
JdbcOptions options,
JdbcReadOptions readOptions,
JdbcLookupOptions lookupOptions,
- TableSchema schema,
- int[] selectFields) {
+ TableSchema physicalSchema) {
this.options = options;
this.readOptions = readOptions;
this.lookupOptions = lookupOptions;
- this.schema = schema;
- this.selectFields = selectFields;
+ this.physicalSchema = physicalSchema;
this.dialectName = options.getDialect().dialectName();
}
@@ -74,15 +72,15 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(innerKeyArr.length == 1,
"JDBC only support non-nested look up keys");
- keyNames[i] = schema.getFieldNames()[innerKeyArr[0]];
+ keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
}
- final RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
+ final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
return TableFunctionProvider.of(new JdbcRowDataLookupFunction(
options,
lookupOptions,
- schema.getFieldNames(),
- schema.getFieldDataTypes(),
+ physicalSchema.getFieldNames(),
+ physicalSchema.getFieldDataTypes(),
keyNames,
rowType));
}
@@ -101,7 +99,7 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
}
final JdbcDialect dialect = options.getDialect();
String query = dialect.getSelectFromStatement(
- options.getTableName(), schema.getFieldNames(), new String[0]);
+ options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
@@ -113,10 +111,10 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
" BETWEEN ? AND ?";
}
builder.setQuery(query);
- final RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
+ final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
- .createTypeInformation(schema.toRowDataType()));
+ .createTypeInformation(physicalSchema.toRowDataType()));
return InputFormatProvider.of(builder.build());
}
@@ -127,8 +125,19 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
}
@Override
+ public boolean supportsNestedProjection() {
+ // JDBC doesn't support nested projection
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields) {
+ this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+ }
+
+ @Override
public DynamicTableSource copy() {
- return new JdbcDynamicTableSource(options, readOptions, lookupOptions, schema, selectFields);
+ return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
}
@Override
@@ -148,15 +157,12 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
return Objects.equals(options, that.options) &&
Objects.equals(readOptions, that.readOptions) &&
Objects.equals(lookupOptions, that.lookupOptions) &&
- Objects.equals(schema, that.schema) &&
- Arrays.equals(selectFields, that.selectFields) &&
+ Objects.equals(physicalSchema, that.physicalSchema) &&
Objects.equals(dialectName, that.dialectName);
}
@Override
public int hashCode() {
- int result = Objects.hash(options, readOptions, lookupOptions, schema, dialectName);
- result = 31 * result + Arrays.hashCode(selectFields);
- return result;
+ return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName);
}
}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
index 28a129d..930a1b0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
@@ -173,16 +173,11 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
helper.validate();
validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
- int[] selectFields = new int[physicalSchema.getFieldNames().length];
- for (int i = 0; i < selectFields.length; i++) {
- selectFields[i] = i;
- }
return new JdbcDynamicTableSource(
getJdbcOptions(helper.getOptions()),
getJdbcReadOptions(helper.getOptions()),
getJdbcLookupOptions(helper.getOptions()),
- physicalSchema,
- selectFields);
+ physicalSchema);
}
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index 6f93307..48be89e 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -22,10 +22,12 @@ import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -34,8 +36,12 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
/**
* ITCase for {@link JdbcDynamicTableSource}.
@@ -79,6 +85,7 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
Statement stat = conn.createStatement()) {
stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
}
+ StreamTestSink.clear();
}
@Test
@@ -106,16 +113,17 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
")"
);
- StreamITCase.clear();
- tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE), Row.class)
- .addSink(new StreamITCase.StringSink<>());
- env.execute();
-
+ Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
+ List<String> result = Lists.newArrayList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
List<String> expected =
- Arrays.asList(
+ Stream.of(
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
- "2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
- StreamITCase.compareWithList(expected);
+ "2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234")
+ .sorted().collect(Collectors.toList());
+ assertEquals(expected, result);
}
@Test
@@ -147,15 +155,16 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase {
")"
);
- StreamITCase.clear();
- tEnv.toAppendStream(tEnv.sqlQuery("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE), Row.class)
- .addSink(new StreamITCase.StringSink<>());
- env.execute();
-
+ Iterator<Row> collected = tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE).collect();
+ List<String> result = Lists.newArrayList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
List<String> expected =
- Arrays.asList(
+ Stream.of(
"1,2020-01-01T15:35:00.123456,100.1234",
- "2,2020-01-01T15:36:01.123456,101.1234");
- StreamITCase.compareWithList(expected);
+ "2,2020-01-01T15:36:01.123456,101.1234")
+ .sorted().collect(Collectors.toList());
+ assertEquals(expected, result);
}
}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
similarity index 86%
rename from flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
rename to flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8d40cdd..793ea9d 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -22,17 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -46,16 +46,20 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertEquals;
/**
- * IT case for {@link JdbcLookupFunction}.
+ * IT case for lookup source of JDBC connector.
*/
@RunWith(Parameterized.class)
-public class JdbcLookupFunctionITCase extends AbstractTestBase {
+public class JdbcLookupTableITCase extends AbstractTestBase {
public static final String DB_URL = "jdbc:derby:memory:lookup";
public static final String LOOKUP_TABLE = "lookup_table";
@@ -63,7 +67,7 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
private final String tableFactory;
private final boolean useCache;
- public JdbcLookupFunctionITCase(String tableFactory, boolean useCache) {
+ public JdbcLookupTableITCase(String tableFactory, boolean useCache) {
this.useCache = useCache;
this.tableFactory = tableFactory;
}
@@ -143,16 +147,20 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
}
@Test
- public void test() throws Exception {
+ public void testLookup() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- StreamITCase.clear();
+ Iterator<Row> collected;
if ("legacyFactory".equals(tableFactory)) {
- useLegacyTableFactory(env, tEnv);
+ collected = useLegacyTableFactory(env, tEnv);
} else {
- useDynamicTableFactory(env, tEnv);
+ collected = useDynamicTableFactory(env, tEnv);
}
+ List<String> result = Lists.newArrayList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
List<String> expected = new ArrayList<>();
expected.add("1,1,11-c1-v1,11-c2-v1");
@@ -162,11 +170,12 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
expected.add("2,3,null,23-c2");
expected.add("2,5,25-c1,25-c2");
expected.add("3,8,38-c1,38-c2");
+ Collections.sort(expected);
- StreamITCase.compareWithList(expected);
+ assertEquals(expected, result);
}
- private void useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
+ private Iterator<Row> useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
new Tuple2<>(1, "1"),
new Tuple2<>(1, "1"),
@@ -195,13 +204,10 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
String sqlQuery = "SELECT id1, id2, comment1, comment2 FROM T, " +
"LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, l_id2, comment1, comment2)";
- Table result = tEnv.sqlQuery(sqlQuery);
- DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink<>());
- env.execute();
+ return tEnv.executeSql(sqlQuery).collect();
}
- private void useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
+ private Iterator<Row> useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception {
Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
new Tuple2<>(1, "1"),
new Tuple2<>(1, "1"),
@@ -229,9 +235,6 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase {
String sqlQuery = "SELECT source.id1, source.id2, L.comment1, L.comment2 FROM T AS source " +
"JOIN lookup for system_time as of source.proctime AS L " +
"ON source.id1 = L.id1 and source.id2 = L.id2";
- Table result = tEnv.sqlQuery(sqlQuery);
- DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink<>());
- env.execute();
+ return tEnv.executeSql(sqlQuery).collect();
}
}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
new file mode 100644
index 0000000..4efcb47
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/**
+ * Plan tests for JDBC connector, for example, testing projection push down.
+ */
+public class JdbcTablePlanTest extends TableTestBase {
+
+ private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+ @Test
+ public void testProjectionPushDown() {
+ util.tableEnv().executeSql(
+ "CREATE TABLE jdbc (" +
+ "id BIGINT," +
+ "timestamp6_col TIMESTAMP(6)," +
+ "timestamp9_col TIMESTAMP(9)," +
+ "time_col TIME," +
+ "real_col FLOAT," +
+ "double_col DOUBLE," +
+ "decimal_col DECIMAL(10, 4)" +
+ ") WITH (" +
+ " 'connector'='jdbc'," +
+ " 'url'='jdbc:derby:memory:test'," +
+ " 'table-name'='test_table'" +
+ ")"
+ );
+ util.verifyPlan("SELECT decimal_col, timestamp9_col, id FROM jdbc");
+ }
+
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 277191c..8115696 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -44,7 +44,6 @@ import java.util.stream.StreamSupport;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
-
/**
* ITCase for {@link JdbcTableSource}.
*/
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
new file mode 100644
index 0000000..9219fc8
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testProjectionPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, timestamp9_col, id])
+]]>
+ </Resource>
+ </TestCase>
+</Root>