You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/31 10:59:06 UTC
[flink] branch master updated: [FLINK-17426][table-planner-blink]
Support the SupportsLimitPushDown interface for ScanTableSource
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab1a1c [FLINK-17426][table-planner-blink] Support the SupportsLimitPushDown interface for ScanTableSource
3ab1a1c is described below
commit 3ab1a1c66772320a0901ea085cfeaa6bf161bf9a
Author: Jacky Lau <li...@gmail.com>
AuthorDate: Fri Jul 31 18:57:26 2020 +0800
[FLINK-17426][table-planner-blink] Support the SupportsLimitPushDown interface for ScanTableSource
This closes #12964
---
.../logical/PushFilterIntoTableSourceScanRule.java | 13 +-
.../logical/PushLimitIntoTableSourceScanRule.java | 118 ++++
.../planner/plan/rules/FlinkBatchRuleSets.scala | 1 +
.../planner/plan/schema/CatalogSourceTable.scala | 1 -
.../planner/factories/TestValuesTableFactory.java | 26 +-
.../PushLimitIntoTableSourceScanRuleTest.java | 66 +++
.../org.apache.flink.table.factories.TableFactory | 2 +-
.../sql/{LimitTest.xml => LegacyLimitTest.xml} | 598 ++++++++++-----------
.../table/planner/plan/batch/sql/LimitTest.xml | 45 +-
.../PushLimitIntoLegacyTableSourceScanRuleTest.xml | 152 ++++++
.../PushLimitIntoTableSourceScanRuleTest.xml | 152 ++++++
.../sql/{LimitTest.scala => LegacyLimitTest.scala} | 42 +-
.../table/planner/plan/batch/sql/LimitTest.scala | 138 ++---
...ushLimitIntoLegacyTableSourceScanRuleTest.scala | 116 ++++
.../{LimitITCase.scala => LegacyLimitITCase.scala} | 8 +-
.../planner/runtime/batch/sql/LimitITCase.scala | 107 +---
.../{LimitITCase.scala => LegacyLimitITCase.scala} | 6 +-
.../planner/runtime/batch/table/LimitITCase.scala | 50 +-
....scala => TestLegacyLimitableTableSource.scala} | 10 +-
19 files changed, 1056 insertions(+), 595 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index b3a48ec..62cb42e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -50,7 +50,6 @@ import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import scala.Tuple2;
@@ -87,7 +86,7 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
// we can not push filter twice
return tableSourceTable != null
&& tableSourceTable.tableSource() instanceof SupportsFilterPushDown
- && !Arrays.stream(tableSourceTable.extraDigests()).anyMatch(str -> str.startsWith("filter=["));
+ && Arrays.stream(tableSourceTable.extraDigests()).noneMatch(str -> str.startsWith("filter=["));
}
@Override
@@ -150,7 +149,7 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
TableSourceTable newTableSourceTable = oldTableSourceTable.copy(
newTableSource,
getNewFlinkStatistic(oldTableSourceTable, originPredicatesSize, updatedPredicatesSize),
- getNewExtraDigests(oldTableSourceTable, result.getAcceptedFilters())
+ getNewExtraDigests(result.getAcceptedFilters())
);
TableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints());
// check whether framework still need to do a filter
@@ -185,8 +184,7 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
return newStatistic;
}
- private String[] getNewExtraDigests(TableSourceTable tableSourceTable, List<ResolvedExpression> acceptedFilters) {
- String[] oldExtraDigests = tableSourceTable.extraDigests();
+ private String[] getNewExtraDigests(List<ResolvedExpression> acceptedFilters) {
String extraDigest = null;
if (!acceptedFilters.isEmpty()) {
// push filter successfully
@@ -200,9 +198,6 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
// try to push filter, but insuccess
extraDigest = "filter=[]";
}
- return Stream.concat(
- Arrays.stream(oldExtraDigests),
- Arrays.stream(new String[]{extraDigest}))
- .toArray(String[]::new);
+ return new String[]{extraDigest};
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java
new file mode 100644
index 0000000..4247f85
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * Planner rule that tries to push limit into a {@link FlinkLogicalTableSourceScan},
+ * which table is a {@link TableSourceTable}. And the table source in the table is a {@link SupportsLimitPushDown}.
+ * The original limit will still be retained.
+ * The reasons why the limit still be retained:
+ * 1.If the source is required to return the exact number of limit number, the implementation
+ * of the source is highly required. The source is required to accurately control the record
+ * number of split, and the parallelism setting also need to be adjusted accordingly.
+ * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed
+ * down. The source need know it should do limit first and do the filter later, it is hard to
+ * implement.
+ * 3.We can support limit with offset, we can push down offset + fetch to table source.
+ */
+public class PushLimitIntoTableSourceScanRule extends RelOptRule {
+ public static final PushLimitIntoTableSourceScanRule INSTANCE = new PushLimitIntoTableSourceScanRule();
+
+ public PushLimitIntoTableSourceScanRule() {
+ super(operand(FlinkLogicalSort.class,
+ operand(FlinkLogicalTableSourceScan.class, none())),
+ "PushLimitIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Sort sort = call.rel(0);
+ TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class);
+
+ // a limit can be pushed down only if it satisfies the two conditions: 1) do not have order by keys, 2) have limit.
+ boolean onlyLimit = sort.getCollation().getFieldCollations().isEmpty() && sort.fetch != null;
+ return onlyLimit
+ && tableSourceTable != null
+ && tableSourceTable.tableSource() instanceof SupportsLimitPushDown
+ && Arrays.stream(tableSourceTable.extraDigests()).noneMatch(str -> str.startsWith("limit=["));
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Sort sort = call.rel(0);
+ FlinkLogicalTableSourceScan scan = call.rel(1);
+ TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+ int offset = sort.offset == null ? 0 : RexLiteral.intValue(sort.offset);
+ int limit = offset + RexLiteral.intValue(sort.fetch);
+
+ TableSourceTable newTableSourceTable = applyLimit(limit, tableSourceTable);
+
+ FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
+ Sort newSort = sort.copy(sort.getTraitSet(), Collections.singletonList(newScan));
+ call.transformTo(newSort);
+ }
+
+ private TableSourceTable applyLimit(
+ long limit,
+ FlinkPreparingTableBase relOptTable) {
+ TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+ DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+ ((SupportsLimitPushDown) newTableSource).applyLimit(limit);
+
+ FlinkStatistic statistic = relOptTable.getStatistic();
+ long newRowCount = 0;
+ if (statistic.getRowCount() != null) {
+ newRowCount = Math.min(limit, statistic.getRowCount().longValue());
+ } else {
+ newRowCount = limit;
+ }
+ // update TableStats after limit push down
+ TableStats newTableStats = new TableStats(newRowCount);
+ FlinkStatistic newStatistic = FlinkStatistic.builder()
+ .statistic(statistic)
+ .tableStats(newTableStats)
+ .build();
+
+ // update extraDigests
+ String[] newExtraDigests = new String[]{"limit=[" + limit + "]"};
+
+ return oldTableSourceTable.copy(
+ newTableSource,
+ newStatistic,
+ newExtraDigests
+ );
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 6251b8e..f5663c3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -90,6 +90,7 @@ object FlinkBatchRuleSets {
private val LIMIT_RULES: RuleSet = RuleSets.ofList(
//push down localLimit
+ PushLimitIntoTableSourceScanRule.INSTANCE,
PushLimitIntoLegacyTableSourceScanRule.INSTANCE)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index e2160ea..6fe7dae 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -200,7 +200,6 @@ class CatalogSourceTable[T](
private def validateTableSource(tableSource: DynamicTableSource): Unit = {
// throw exception if unsupported ability interface is implemented
val unsupportedAbilities = List(
- classOf[SupportsLimitPushDown],
classOf[SupportsPartitionPushDown],
classOf[SupportsComputedColumnPushDown],
classOf[SupportsWatermarkPushDown])
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 34fb0f4..2a51167 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
@@ -278,7 +279,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
nestedProjectionSupported,
null,
null,
- filterableFieldsSet);
+ filterableFieldsSet,
+ Long.MAX_VALUE);
} else {
try {
return InstantiationUtil.instantiate(
@@ -359,7 +361,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
/**
* Values {@link DynamicTableSource} for testing.
*/
- private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
+ private static class TestValuesTableSource implements ScanTableSource,
+ LookupTableSource,
+ SupportsProjectionPushDown,
+ SupportsFilterPushDown,
+ SupportsLimitPushDown {
private TableSchema physicalSchema;
private final ChangelogMode changelogMode;
@@ -372,6 +378,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
private @Nullable int[] projectedFields;
private List<ResolvedExpression> filterPredicates;
private final Set<String> filterableFields;
+ private long limit;
private TestValuesTableSource(
TableSchema physicalSchema,
@@ -384,7 +391,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
boolean nestedProjectionSupported,
int[] projectedFields,
List<ResolvedExpression> filterPredicates,
- Set<String> filterableFields) {
+ Set<String> filterableFields,
+ long limit) {
this.physicalSchema = physicalSchema;
this.changelogMode = changelogMode;
this.bounded = bounded;
@@ -396,6 +404,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
this.projectedFields = projectedFields;
this.filterPredicates = filterPredicates;
this.filterableFields = filterableFields;
+ this.limit = limit;
}
@Override
@@ -610,7 +619,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
nestedProjectionSupported,
projectedFields,
filterPredicates,
- filterableFields);
+ filterableFields,
+ limit);
}
@Override
@@ -624,6 +634,9 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
for (Row value : data) {
+ if (result.size() >= limit) {
+ return result;
+ }
if (isRetainedAfterApplyingFilterPredicates(value)) {
Row projectedRow;
if (projectedFields == null) {
@@ -644,6 +657,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
}
return result;
}
+
+ @Override
+ public void applyLimit(long limit) {
+ this.limit = limit;
+ }
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.java
new file mode 100644
index 0000000..442e1c5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.SortProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for {@link PushLimitIntoTableSourceScanRule}.
+ */
+public class PushLimitIntoTableSourceScanRuleTest extends PushLimitIntoLegacyTableSourceScanRuleTest {
+ @Override
+ public void setup() {
+ util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+ calciteConfig.getBatchProgram().get().addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(PushLimitIntoTableSourceScanRule.INSTANCE,
+ SortProjectTransposeRule.INSTANCE,
+ // converts calcite rel(RelNode) to flink rel(FlinkRelNode)
+ FlinkLogicalSort.BATCH_CONVERTER(),
+ FlinkLogicalTableSourceScan.CONVERTER()))
+ .build()
+ );
+
+ String ddl =
+ "CREATE TABLE LimitTable (\n" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c string\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'bounded' = 'true'\n" +
+ ")";
+ util().tableEnv().executeSql(ddl);
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index a1fde47..9a8051b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -28,7 +28,7 @@ org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil$DataTypeOutputFor
org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil$LegacyUnsafeMemoryAppendTableFactory
org.apache.flink.table.planner.utils.TestTableSourceFactory
org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableFactory
-org.apache.flink.table.planner.utils.TestLimitableTableSourceFactory
+org.apache.flink.table.planner.utils.TestLegacyLimitableTableSourceFactory
org.apache.flink.table.planner.utils.TestInputFormatTableSourceFactory
org.apache.flink.table.planner.utils.TestDataTypeTableSourceFactory
org.apache.flink.table.planner.utils.TestDataTypeTableSourceWithTimeFactory
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.xml
similarity index 97%
copy from flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
copy to flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.xml
index 656232e..d40063c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.xml
@@ -15,302 +15,302 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
-<Root>
- <TestCase name="testFetch0WithoutOffset">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 0 ROWS ONLY]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(fetch=[0])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Values(tuples=[[]], values=[a, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testFetchWithLimitSource">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testFetchWithOffset">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[10], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[20], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testFetchWithOffsetAndLimitSource">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[10], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[20], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testFetchWithoutOffset">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 10 ROWS ONLY]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimit0WithOffset">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 10]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[10], fetch=[0])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Values(tuples=[[]], values=[a, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimit0WithOffset0">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 0]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[0], fetch=[0])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Values(tuples=[[]], values=[a, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimit0WithoutOffset">
- <Resource name="sql">
- <![CDATA[SELECT * FROM MyTable LIMIT 0]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(fetch=[0])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimitWithLimitSource">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM LimitTable LIMIT 10]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testOrderByWithLimitSource">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimitWithOffset">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[1], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[1], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[11], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimitWithOffset0">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 0]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[0], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimitWithoutOffset">
- <Resource name="sql">
- <![CDATA[SELECT * FROM MyTable LIMIT 5]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(fetch=[5])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Limit(offset=[0], fetch=[5], global=[true])
-+- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[5], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testOnlyOffset">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[unlimited], global=[true])
- +- Exchange(distribution=[single])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLimitWithOffsetAndLimitSource">
- <Resource name="sql">
- <![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalSort(offset=[1], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[1], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[11], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
-</Root>
+<Root>
+ <TestCase name="testFetch0WithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 0 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[0])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[a, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFetchWithLimitSource">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[10], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFetchWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[10], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[20], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFetchWithOffsetAndLimitSource">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[10], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[20], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFetchWithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[10], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimit0WithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10], fetch=[0])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[a, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimit0WithOffset0">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[0], fetch=[0])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[a, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimit0WithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable LIMIT 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitWithLimitSource">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable LIMIT 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[10], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testOrderByWithLimitSource">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[1], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[11], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitWithOffset0">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[0], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[10], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitWithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable LIMIT 5]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[5])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Limit(offset=[0], fetch=[5], global=[true])
++- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[5], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testOnlyOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[10], fetch=[unlimited], global=[true])
+ +- Exchange(distribution=[single])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitWithOffsetAndLimitSource">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[1], fetch=[10], global=[true])
+ +- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[11], global=[false])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
index 656232e..a415295 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
@@ -46,11 +46,10 @@ LogicalSort(fetch=[10])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+Limit(offset=[0], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[10], global=[false])
+ +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[10]]], fields=[a, c])
]]>
</Resource>
</TestCase>
@@ -88,11 +87,10 @@ LogicalSort(offset=[10], fetch=[10])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[20], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
+Limit(offset=[10], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[20], global=[false])
+ +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[20]]], fields=[a, c])
]]>
</Resource>
</TestCase>
@@ -181,11 +179,10 @@ LogicalSort(fetch=[10])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+Limit(offset=[0], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[10], global=[false])
+ +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[10]]], fields=[a, c])
]]>
</Resource>
</TestCase>
@@ -202,11 +199,10 @@ LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+ +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
+ +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c]]], fields=[a, c])
]]>
</Resource>
</TestCase>
@@ -305,11 +301,10 @@ LogicalSort(offset=[1], fetch=[10])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[1], fetch=[10], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[11], global=[false])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
+Limit(offset=[1], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+ +- Limit(offset=[0], fetch=[11], global=[false])
+ +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[11]]], fields=[a, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.xml
new file mode 100644
index 0000000..b2c8cc3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCanPushdownLimitWithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable LIMIT 5]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[5])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[5])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 5]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushdownFetchWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10], fetch=[10])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushdownFetchWithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[10])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithOrderBy">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(sort0=[$2], dir0=[ASC-nulls-first], fetch=[10])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushdownLimitWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[1], fetch=[10])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithoutLimit">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithoutFetch">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.xml
new file mode 100644
index 0000000..ff75d79
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCanPushdownLimitWithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable LIMIT 5]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[5])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[5])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[5]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushdownFetchWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10], fetch=[10])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[20]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushdownFetchWithoutOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[10])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[10]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithOrderBy">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(sort0=[$2], dir0=[ASC-nulls-first], fetch=[10])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanPushdownLimitWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[1], fetch=[10])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[11]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithoutLimit">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCannotPushDownWithoutFetch">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.scala
similarity index 81%
copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.scala
index 788eb3a..88abf19 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.scala
@@ -18,25 +18,31 @@
package org.apache.flink.table.planner.plan.batch.sql
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{SqlParserException, TableSchema, _}
-import org.apache.flink.table.planner.utils.{TableTestBase, TestLimitableTableSource}
-
-import org.junit.Test
-
-class LimitTest extends TableTestBase {
-
- private val util = batchTestUtil()
- util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- TestLimitableTableSource.createTemporaryTable(
- util.tableEnv,
- Seq(),
- new TableSchema(
- Array("a", "b", "c"),
- Array[TypeInformation[_]](INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)),
- "LimitTable")
+import org.apache.flink.table.api.{SqlParserException, _}
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+class LegacyLimitTest extends TableTestBase {
+
+ protected val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val ddl =
+ s"""
+ |CREATE TABLE LimitTable (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector.type' = 'TestLimitableTableSource',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin
+ util.tableEnv.executeSql(ddl)
+ }
@Test
def testLimitWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
index 788eb3a..0e4ab2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
@@ -18,114 +18,36 @@
package org.apache.flink.table.planner.plan.batch.sql
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{SqlParserException, TableSchema, _}
-import org.apache.flink.table.planner.utils.{TableTestBase, TestLimitableTableSource}
-
-import org.junit.Test
-
-class LimitTest extends TableTestBase {
-
- private val util = batchTestUtil()
- util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- TestLimitableTableSource.createTemporaryTable(
- util.tableEnv,
- Seq(),
- new TableSchema(
- Array("a", "b", "c"),
- Array[TypeInformation[_]](INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)),
- "LimitTable")
-
- @Test
- def testLimitWithoutOffset(): Unit = {
- util.verifyPlan("SELECT * FROM MyTable LIMIT 5")
- }
-
- @Test
- def testLimit0WithoutOffset(): Unit = {
- util.verifyPlan("SELECT * FROM MyTable LIMIT 0")
- }
-
- @Test(expected = classOf[SqlParserException])
- def testNegativeLimitWithoutOffset(): Unit = {
- util.verifyPlan("SELECT * FROM MyTable LIMIT -1")
- }
-
- @Test
- def testLimitWithOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable LIMIT 10 OFFSET 1")
- }
-
- @Test
- def testLimitWithOffset0(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable LIMIT 10 OFFSET 0")
- }
-
- @Test
- def testLimit0WithOffset0(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable LIMIT 0 OFFSET 0")
- }
-
- @Test
- def testLimit0WithOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable LIMIT 0 OFFSET 10")
- }
-
- @Test(expected = classOf[SqlParserException])
- def testLimitWithNegativeOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable LIMIT 10 OFFSET -1")
- }
-
- @Test
- def testFetchWithOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY")
- }
-
- @Test
- def testFetchWithoutOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable FETCH FIRST 10 ROWS ONLY")
- }
-
- @Test
- def testFetch0WithoutOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable FETCH FIRST 0 ROWS ONLY")
- }
-
- @Test
- def testOnlyOffset(): Unit = {
- util.verifyPlan("SELECT a, c FROM MyTable OFFSET 10 ROWS")
- }
-
- @Test
- def testFetchWithLimitSource(): Unit = {
- val sqlQuery = "SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY"
- util.verifyPlan(sqlQuery)
- }
-
- @Test
- def testOrderByWithLimitSource(): Unit = {
- val sqlQuery = "SELECT a, c FROM LimitTable ORDER BY c LIMIT 10"
- util.verifyPlan(sqlQuery)
- }
-
- @Test
- def testLimitWithLimitSource(): Unit = {
- val sqlQuery = "SELECT a, c FROM LimitTable LIMIT 10"
- util.verifyPlan(sqlQuery)
- }
-
- @Test
- def testLimitWithOffsetAndLimitSource(): Unit = {
- val sqlQuery = "SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1"
- util.verifyPlan(sqlQuery)
- }
-
- @Test
- def testFetchWithOffsetAndLimitSource(): Unit = {
- val sqlQuery = "SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY"
- util.verifyPlan(sqlQuery)
+import org.apache.flink.table.api._
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown
+
+/**
+ * The plan of following unit test in LimitTest.xml is a bit diffirent from LegacyLimitTest.xml.
+ * Because the TestValuesTableSource has implemented [[SupportsProjectionPushDown]]
+ * while the TestLegacyLimitableTableSource doesn't.
+ * So the Calc has been pushed down to the scan.
+ * 1.testFetchWithOffsetAndLimitSource
+ * 2.testOrderByWithLimitSource
+ * 3.testLimitWithLimitSource
+ * 4.testLimitWithOffsetAndLimitSource
+ * 5.testFetchWithLimitSource
+ */
+class LimitTest extends LegacyLimitTest {
+
+ override def setup(): Unit = {
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val ddl =
+ s"""
+ |CREATE TABLE LimitTable (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true'
+ |)
+ """.stripMargin
+ util.tableEnv.executeSql(ddl)
}
-
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.scala
new file mode 100644
index 0000000..566fe1b7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.SortProjectTransposeRule
+import org.apache.calcite.tools.RuleSets
+import org.apache.flink.table.api.SqlParserException
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan, FlinkLogicalSort}
+import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase}
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[PushLimitIntoLegacyTableSourceScanRule]].
+ */
+class PushLimitIntoLegacyTableSourceScanRuleTest extends TableTestBase {
+ protected val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ val calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv.getConfig)
+ calciteConfig.getBatchProgram.get.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(PushLimitIntoLegacyTableSourceScanRule.INSTANCE,
+ SortProjectTransposeRule.INSTANCE,
+ // converts calcite rel(RelNode) to flink rel(FlinkRelNode)
+ FlinkLogicalSort.BATCH_CONVERTER,
+ FlinkLogicalLegacyTableSourceScan.CONVERTER))
+ .build()
+ )
+
+ val ddl =
+ s"""
+ |CREATE TABLE LimitTable (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector.type' = 'TestLimitableTableSource',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin
+ util.tableEnv.executeSql(ddl)
+ }
+
+ @Test(expected = classOf[SqlParserException])
+ def testLimitWithNegativeOffset(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 10 OFFSET -1")
+ }
+
+ @Test(expected = classOf[SqlParserException])
+ def testNegativeLimitWithoutOffset(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable LIMIT -1")
+ }
+
+ @Test(expected = classOf[SqlParserException])
+ def testMysqlLimit(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 1, 10")
+ }
+
+ @Test
+ def testCanPushdownLimitWithoutOffset(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 5")
+ }
+
+ @Test
+ def testCanPushdownLimitWithOffset(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1")
+ }
+
+ @Test
+ def testCanPushdownFetchWithOffset(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY")
+ }
+
+ @Test
+ def testCanPushdownFetchWithoutOffset(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY")
+ }
+
+ @Test
+ def testCannotPushDownWithoutLimit(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable OFFSET 10")
+ }
+
+ @Test
+ def testCannotPushDownWithoutFetch(): Unit = {
+ util.verifyPlan("SELECT a, c FROM LimitTable OFFSET 10 ROWS")
+ }
+
+ @Test
+ def testCannotPushDownWithOrderBy(): Unit = {
+ val sqlQuery = "SELECT a, c FROM LimitTable ORDER BY c LIMIT 10"
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
similarity index 92%
copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
index 1f49b67..a6273e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
@@ -19,19 +19,21 @@
package org.apache.flink.table.planner.runtime.batch.sql
import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
+import org.apache.flink.table.planner.utils.TestLegacyLimitableTableSource
+
import org.junit._
-class LimitITCase extends BatchTestBase {
+class LegacyLimitITCase extends BatchTestBase {
@Before
override def before(): Unit = {
super.before()
registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
- TestLimitableTableSource.createTemporaryTable(
+ TestLegacyLimitableTableSource.createTemporaryTable(
tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index 1f49b67..8b87824 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -18,97 +18,28 @@
package org.apache.flink.table.planner.runtime.batch.sql
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.planner.runtime.utils.BatchTestBase
-import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
-import org.junit._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.TestData.{data3, nullablesOfData3, type3}
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-class LimitITCase extends BatchTestBase {
-
- @Before
+class LimitITCase extends LegacyLimitITCase {
override def before(): Unit = {
- super.before()
+ BatchTestBase.configForMiniCluster(conf)
registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
- TestLimitableTableSource.createTemporaryTable(
- tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
- }
-
- @Test
- def testOffsetAndFetch(): Unit = {
- checkSize(
- "SELECT * FROM Table3 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY",
- 5)
- }
-
- @Test
- def testOffsetAndLimit(): Unit = {
- checkSize(
- "SELECT * FROM Table3 LIMIT 10 OFFSET 2",
- 10)
- }
-
- @Test
- def testFetch(): Unit = {
- checkSize(
- "SELECT * FROM Table3 FETCH NEXT 10 ROWS ONLY",
- 10)
- }
-
- @Test
- def testFetchWithLimitTable(): Unit = {
- checkSize(
- "SELECT * FROM LimitTable FETCH NEXT 10 ROWS ONLY",
- 10)
- }
-
- @Test
- def testFetchFirst(): Unit = {
- checkSize(
- "SELECT * FROM Table3 FETCH FIRST 10 ROWS ONLY",
- 10)
- }
-
- @Test
- def testFetchFirstWithLimitTable(): Unit = {
- checkSize(
- "SELECT * FROM LimitTable FETCH FIRST 10 ROWS ONLY",
- 10)
- }
-
- @Test
- def testLimit(): Unit = {
- checkSize(
- "SELECT * FROM Table3 LIMIT 5",
- 5)
- }
-
- @Test
- def testLimit0WithLimitTable(): Unit = {
- checkSize(
- "SELECT * FROM LimitTable LIMIT 0",
- 0)
- }
-
- @Test
- def testLimitWithLimitTable(): Unit = {
- checkSize(
- "SELECT * FROM LimitTable LIMIT 5",
- 5)
- }
-
- @Test
- def testLessThanOffset(): Unit = {
- checkSize(
- "SELECT * FROM Table3 OFFSET 2 ROWS FETCH NEXT 50 ROWS ONLY",
- 19)
- }
-
- @Test
- def testLessThanOffsetWithLimitSource(): Unit = {
- checkSize(
- "SELECT * FROM LimitTable OFFSET 2 ROWS FETCH NEXT 50 ROWS ONLY",
- 19)
+ val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
+ val ddl =
+ s"""
+ |CREATE TABLE LimitTable (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$myTableDataId',
+ | 'bounded' = 'true'
+ |)
+ """.stripMargin
+ tEnv.executeSql(ddl)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LegacyLimitITCase.scala
similarity index 90%
copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LegacyLimitITCase.scala
index 9562a40..a6e310a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LegacyLimitITCase.scala
@@ -21,17 +21,17 @@ package org.apache.flink.table.planner.runtime.batch.table
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
+import org.apache.flink.table.planner.utils.TestLegacyLimitableTableSource
import org.junit.Assert.assertEquals
import org.junit._
-class LimitITCase extends BatchTestBase {
+class LegacyLimitITCase extends BatchTestBase {
@Before
override def before(): Unit = {
super.before()
- TestLimitableTableSource.createTemporaryTable(
+ TestLegacyLimitableTableSource.createTemporaryTable(
tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
index 9562a40..ccda349 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
@@ -18,41 +18,29 @@
package org.apache.flink.table.planner.runtime.batch.table
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.planner.runtime.utils.BatchTestBase
-import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
-import org.junit.Assert.assertEquals
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
import org.junit._
-class LimitITCase extends BatchTestBase {
+class LimitITCase extends LegacyLimitITCase {
@Before
override def before(): Unit = {
- super.before()
-
- TestLimitableTableSource.createTemporaryTable(
- tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
- }
-
- @Test
- def testFetch(): Unit = {
- assertEquals(
- executeQuery(tEnv.from("LimitTable").fetch(5)).size,
- 5)
- }
-
- @Test
- def testOffset(): Unit = {
- assertEquals(
- executeQuery(tEnv.from("LimitTable").offset(5)).size,
- 16)
- }
-
- @Test
- def testOffsetAndFetch(): Unit = {
- assertEquals(
- executeQuery(tEnv.from("LimitTable").limit(5, 5)).size,
- 5)
+ BatchTestBase.configForMiniCluster(conf)
+
+ val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
+ val ddl =
+ s"""
+ |CREATE TABLE LimitTable (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$myTableDataId',
+ | 'bounded' = 'true'
+ |)
+ """.stripMargin
+ tEnv.executeSql(ddl)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLegacyLimitableTableSource.scala
similarity index 94%
rename from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
rename to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLegacyLimitableTableSource.scala
index 81e6e46..211bf55 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLegacyLimitableTableSource.scala
@@ -41,7 +41,7 @@ import scala.collection.JavaConverters._
/**
* The table source which support push-down the limit to the source.
*/
-class TestLimitableTableSource(
+class TestLegacyLimitableTableSource(
data: Seq[Row],
rowType: RowTypeInfo,
var limit: Long = -1,
@@ -66,7 +66,7 @@ class TestLimitableTableSource(
}
override def applyLimit(limit: Long): TableSource[Row] = {
- new TestLimitableTableSource(data, rowType, limit, limitablePushedDown)
+ new TestLegacyLimitableTableSource(data, rowType, limit, limitablePushedDown)
}
override def isLimitPushedDown: Boolean = limitablePushedDown
@@ -84,7 +84,7 @@ class TestLimitableTableSource(
override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(rowType)
}
-object TestLimitableTableSource {
+object TestLegacyLimitableTableSource {
def createTemporaryTable(
tEnv: TableEnvironment,
data: Seq[Row],
@@ -100,7 +100,7 @@ object TestLimitableTableSource {
}
}
-class TestLimitableTableSourceFactory extends StreamTableSourceFactory[Row] {
+class TestLegacyLimitableTableSourceFactory extends StreamTableSourceFactory[Row] {
override def createStreamTableSource(
properties: util.Map[String, String]): StreamTableSource[Row] = {
val dp = new DescriptorProperties
@@ -117,7 +117,7 @@ class TestLimitableTableSourceFactory extends StreamTableSourceFactory[Row] {
val limit = dp.getOptionalLong("limit").orElse(-1L)
val limitablePushedDown = dp.getOptionalBoolean("limitable-push-down").orElse(false)
- new TestLimitableTableSource(
+ new TestLegacyLimitableTableSource(
data, tableSchema.toRowType.asInstanceOf[RowTypeInfo], limit, limitablePushedDown)
}