You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/31 10:59:06 UTC

[flink] branch master updated: [FLINK-17426][table-planner-blink] Support the SupportsLimitPushDown interface for ScanTableSource

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab1a1c  [FLINK-17426][table-planner-blink] Support the SupportsLimitPushDown interface for ScanTableSource
3ab1a1c is described below

commit 3ab1a1c66772320a0901ea085cfeaa6bf161bf9a
Author: Jacky Lau <li...@gmail.com>
AuthorDate: Fri Jul 31 18:57:26 2020 +0800

    [FLINK-17426][table-planner-blink] Support the SupportsLimitPushDown interface for ScanTableSource
    
    This closes #12964
---
 .../logical/PushFilterIntoTableSourceScanRule.java |  13 +-
 .../logical/PushLimitIntoTableSourceScanRule.java  | 118 ++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   1 +
 .../planner/plan/schema/CatalogSourceTable.scala   |   1 -
 .../planner/factories/TestValuesTableFactory.java  |  26 +-
 .../PushLimitIntoTableSourceScanRuleTest.java      |  66 +++
 .../org.apache.flink.table.factories.TableFactory  |   2 +-
 .../sql/{LimitTest.xml => LegacyLimitTest.xml}     | 598 ++++++++++-----------
 .../table/planner/plan/batch/sql/LimitTest.xml     |  45 +-
 .../PushLimitIntoLegacyTableSourceScanRuleTest.xml | 152 ++++++
 .../PushLimitIntoTableSourceScanRuleTest.xml       | 152 ++++++
 .../sql/{LimitTest.scala => LegacyLimitTest.scala} |  42 +-
 .../table/planner/plan/batch/sql/LimitTest.scala   | 138 ++---
 ...ushLimitIntoLegacyTableSourceScanRuleTest.scala | 116 ++++
 .../{LimitITCase.scala => LegacyLimitITCase.scala} |   8 +-
 .../planner/runtime/batch/sql/LimitITCase.scala    | 107 +---
 .../{LimitITCase.scala => LegacyLimitITCase.scala} |   6 +-
 .../planner/runtime/batch/table/LimitITCase.scala  |  50 +-
 ....scala => TestLegacyLimitableTableSource.scala} |  10 +-
 19 files changed, 1056 insertions(+), 595 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index b3a48ec..62cb42e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -50,7 +50,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import scala.Tuple2;
 
@@ -87,7 +86,7 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
 		// we can not push filter twice
 		return tableSourceTable != null
 			&& tableSourceTable.tableSource() instanceof SupportsFilterPushDown
-			&& !Arrays.stream(tableSourceTable.extraDigests()).anyMatch(str -> str.startsWith("filter=["));
+			&& Arrays.stream(tableSourceTable.extraDigests()).noneMatch(str -> str.startsWith("filter=["));
 	}
 
 	@Override
@@ -150,7 +149,7 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
 		TableSourceTable newTableSourceTable = oldTableSourceTable.copy(
 			newTableSource,
 			getNewFlinkStatistic(oldTableSourceTable, originPredicatesSize, updatedPredicatesSize),
-			getNewExtraDigests(oldTableSourceTable, result.getAcceptedFilters())
+			getNewExtraDigests(result.getAcceptedFilters())
 		);
 		TableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints());
 		// check whether framework still need to do a filter
@@ -185,8 +184,7 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
 		return newStatistic;
 	}
 
-	private String[] getNewExtraDigests(TableSourceTable tableSourceTable, List<ResolvedExpression> acceptedFilters) {
-		String[] oldExtraDigests = tableSourceTable.extraDigests();
+	private String[] getNewExtraDigests(List<ResolvedExpression> acceptedFilters) {
 		String extraDigest = null;
 		if (!acceptedFilters.isEmpty()) {
 			// push filter successfully
@@ -200,9 +198,6 @@ public class PushFilterIntoTableSourceScanRule extends RelOptRule {
 			// try to push filter, but insuccess
 			extraDigest = "filter=[]";
 		}
-		return Stream.concat(
-				Arrays.stream(oldExtraDigests),
-				Arrays.stream(new String[]{extraDigest}))
-			.toArray(String[]::new);
+		return new String[]{extraDigest};
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java
new file mode 100644
index 0000000..4247f85
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/**
+ * Planner rule that tries to push limit into a {@link FlinkLogicalTableSourceScan},
+ * which table is a {@link TableSourceTable}. And the table source in the table is a {@link SupportsLimitPushDown}.
+ * The original limit will still be retained.
+ * The reasons why the limit still be retained:
+ * 1.If the source is required to return the exact number of limit number, the implementation
+ * of the source is highly required. The source is required to accurately control the record
+ * number of split, and the parallelism setting also need to be adjusted accordingly.
+ * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed
+ * down. The source need know it should do limit first and do the filter later, it is hard to
+ * implement.
+ * 3.We can support limit with offset, we can push down offset + fetch to table source.
+ */
+public class PushLimitIntoTableSourceScanRule extends RelOptRule {
+	public static final PushLimitIntoTableSourceScanRule INSTANCE = new PushLimitIntoTableSourceScanRule();
+
+	public PushLimitIntoTableSourceScanRule() {
+		super(operand(FlinkLogicalSort.class,
+			operand(FlinkLogicalTableSourceScan.class, none())),
+			"PushLimitIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		Sort sort = call.rel(0);
+		TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class);
+
+		// a limit can be pushed down only if it satisfies the two conditions: 1) do not have order by keys, 2) have limit.
+		boolean onlyLimit = sort.getCollation().getFieldCollations().isEmpty() && sort.fetch != null;
+		return onlyLimit
+			&& tableSourceTable != null
+			&& tableSourceTable.tableSource() instanceof SupportsLimitPushDown
+			&& Arrays.stream(tableSourceTable.extraDigests()).noneMatch(str -> str.startsWith("limit=["));
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Sort sort = call.rel(0);
+		FlinkLogicalTableSourceScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		int offset = sort.offset == null ? 0 : RexLiteral.intValue(sort.offset);
+		int limit = offset + RexLiteral.intValue(sort.fetch);
+
+		TableSourceTable newTableSourceTable = applyLimit(limit, tableSourceTable);
+
+		FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
+		Sort newSort = sort.copy(sort.getTraitSet(), Collections.singletonList(newScan));
+		call.transformTo(newSort);
+	}
+
+	private TableSourceTable applyLimit(
+		long limit,
+		FlinkPreparingTableBase relOptTable) {
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		((SupportsLimitPushDown) newTableSource).applyLimit(limit);
+
+		FlinkStatistic statistic = relOptTable.getStatistic();
+		long newRowCount = 0;
+		if (statistic.getRowCount() != null) {
+			newRowCount = Math.min(limit, statistic.getRowCount().longValue());
+		} else {
+			newRowCount = limit;
+		}
+		// update TableStats after limit push down
+		TableStats newTableStats = new TableStats(newRowCount);
+		FlinkStatistic newStatistic = FlinkStatistic.builder()
+			.statistic(statistic)
+			.tableStats(newTableStats)
+			.build();
+
+		// update extraDigests
+		String[] newExtraDigests = new String[]{"limit=[" + limit + "]"};
+
+		return oldTableSourceTable.copy(
+			newTableSource,
+			newStatistic,
+			newExtraDigests
+		);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 6251b8e..f5663c3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -90,6 +90,7 @@ object FlinkBatchRuleSets {
 
   private val LIMIT_RULES: RuleSet = RuleSets.ofList(
     //push down localLimit
+    PushLimitIntoTableSourceScanRule.INSTANCE,
     PushLimitIntoLegacyTableSourceScanRule.INSTANCE)
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index e2160ea..6fe7dae 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -200,7 +200,6 @@ class CatalogSourceTable[T](
   private def validateTableSource(tableSource: DynamicTableSource): Unit = {
     // throw exception if unsupported ability interface is implemented
     val unsupportedAbilities = List(
-      classOf[SupportsLimitPushDown],
       classOf[SupportsPartitionPushDown],
       classOf[SupportsComputedColumnPushDown],
       classOf[SupportsWatermarkPushDown])
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 34fb0f4..2a51167 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.CallExpression;
@@ -278,7 +279,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 				nestedProjectionSupported,
 				null,
 				null,
-				filterableFieldsSet);
+				filterableFieldsSet,
+				Long.MAX_VALUE);
 		} else {
 			try {
 				return InstantiationUtil.instantiate(
@@ -359,7 +361,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
+	private static class TestValuesTableSource implements ScanTableSource,
+		LookupTableSource,
+		SupportsProjectionPushDown,
+		SupportsFilterPushDown,
+		SupportsLimitPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
@@ -372,6 +378,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		private @Nullable int[] projectedFields;
 		private List<ResolvedExpression> filterPredicates;
 		private final Set<String> filterableFields;
+		private long limit;
 
 		private TestValuesTableSource(
 				TableSchema physicalSchema,
@@ -384,7 +391,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 				boolean nestedProjectionSupported,
 				int[] projectedFields,
 				List<ResolvedExpression> filterPredicates,
-				Set<String> filterableFields) {
+				Set<String> filterableFields,
+				long limit) {
 			this.physicalSchema = physicalSchema;
 			this.changelogMode = changelogMode;
 			this.bounded = bounded;
@@ -396,6 +404,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 			this.projectedFields = projectedFields;
 			this.filterPredicates = filterPredicates;
 			this.filterableFields = filterableFields;
+			this.limit = limit;
 		}
 
 		@Override
@@ -610,7 +619,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 				nestedProjectionSupported,
 				projectedFields,
 				filterPredicates,
-				filterableFields);
+				filterableFields,
+				limit);
 		}
 
 		@Override
@@ -624,6 +634,9 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 				DataStructureConverter converter) {
 			List<RowData> result = new ArrayList<>();
 			for (Row value : data) {
+				if (result.size() >= limit) {
+					return result;
+				}
 				if (isRetainedAfterApplyingFilterPredicates(value)) {
 					Row projectedRow;
 					if (projectedFields == null) {
@@ -644,6 +657,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 			}
 			return result;
 		}
+
+		@Override
+		public void applyLimit(long limit) {
+			this.limit = limit;
+		}
 	}
 
 	/**
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.java
new file mode 100644
index 0000000..442e1c5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.SortProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for {@link PushLimitIntoTableSourceScanRule}.
+ */
+public class PushLimitIntoTableSourceScanRuleTest extends PushLimitIntoLegacyTableSourceScanRuleTest {
+	@Override
+	public void setup() {
+		util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+		calciteConfig.getBatchProgram().get().addLast(
+			"rules",
+			FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+				.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+				.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+				.add(RuleSets.ofList(PushLimitIntoTableSourceScanRule.INSTANCE,
+					SortProjectTransposeRule.INSTANCE,
+					// converts calcite rel(RelNode) to flink rel(FlinkRelNode)
+					FlinkLogicalSort.BATCH_CONVERTER(),
+					FlinkLogicalTableSourceScan.CONVERTER()))
+				.build()
+		);
+
+		String ddl =
+			"CREATE TABLE LimitTable (\n" +
+				"  a int,\n" +
+				"  b bigint,\n" +
+				"  c string\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+		util().tableEnv().executeSql(ddl);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index a1fde47..9a8051b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -28,7 +28,7 @@ org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil$DataTypeOutputFor
 org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil$LegacyUnsafeMemoryAppendTableFactory
 org.apache.flink.table.planner.utils.TestTableSourceFactory
 org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableFactory
-org.apache.flink.table.planner.utils.TestLimitableTableSourceFactory
+org.apache.flink.table.planner.utils.TestLegacyLimitableTableSourceFactory
 org.apache.flink.table.planner.utils.TestInputFormatTableSourceFactory
 org.apache.flink.table.planner.utils.TestDataTypeTableSourceFactory
 org.apache.flink.table.planner.utils.TestDataTypeTableSourceWithTimeFactory
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.xml
similarity index 97%
copy from flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
copy to flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.xml
index 656232e..d40063c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.xml
@@ -15,302 +15,302 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 -->
-<Root>
-  <TestCase name="testFetch0WithoutOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 0 ROWS ONLY]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(fetch=[0])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Values(tuples=[[]], values=[a, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testFetchWithLimitSource">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testFetchWithOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[10], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[20], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testFetchWithOffsetAndLimitSource">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[10], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[20], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testFetchWithoutOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 10 ROWS ONLY]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimit0WithOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 10]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[10], fetch=[0])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Values(tuples=[[]], values=[a, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimit0WithOffset0">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 0]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[0], fetch=[0])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Values(tuples=[[]], values=[a, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimit0WithoutOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable LIMIT 0]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(fetch=[0])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimitWithLimitSource">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM LimitTable LIMIT 10]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testOrderByWithLimitSource">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimitWithOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[1], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[1], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[11], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimitWithOffset0">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 0]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[0], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimitWithoutOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable LIMIT 5]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(fetch=[5])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Limit(offset=[0], fetch=[5], global=[true])
-+- Exchange(distribution=[single])
-   +- Limit(offset=[0], fetch=[5], global=[false])
-      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testOnlyOffset">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[unlimited], global=[true])
-   +- Exchange(distribution=[single])
-      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLimitWithOffsetAndLimitSource">
-    <Resource name="sql">
-      <![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalSort(offset=[1], fetch=[10])
-+- LogicalProject(a=[$0], c=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[1], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[11], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-</Root>
+<Root>
+  <TestCase name="testFetch0WithoutOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 0 ROWS ONLY]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[0])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFetchWithLimitSource">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[10], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFetchWithOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[10], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[20], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFetchWithOffsetAndLimitSource">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[10], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[20], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFetchWithoutOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable FETCH FIRST 10 ROWS ONLY]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[10], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimit0WithOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 10]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[10], fetch=[0])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimit0WithOffset0">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable LIMIT 0 OFFSET 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[0], fetch=[0])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimit0WithoutOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable LIMIT 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitWithLimitSource">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM LimitTable LIMIT 10]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[10], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOrderByWithLimitSource">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitWithOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[1], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[11], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitWithOffset0">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable LIMIT 10 OFFSET 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[0], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[0], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[10], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitWithoutOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable LIMIT 5]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[5])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Limit(offset=[0], fetch=[5], global=[true])
++- Exchange(distribution=[single])
+   +- Limit(offset=[0], fetch=[5], global=[false])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnlyOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM MyTable OFFSET 10 ROWS]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[10], fetch=[unlimited], global=[true])
+   +- Exchange(distribution=[single])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitWithOffsetAndLimitSource">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, c])
++- Limit(offset=[1], fetch=[10], global=[true])
+   +- Exchange(distribution=[single])
+      +- Limit(offset=[0], fetch=[11], global=[false])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
index 656232e..a415295 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
@@ -46,11 +46,10 @@ LogicalSort(fetch=[10])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+Limit(offset=[0], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+   +- Limit(offset=[0], fetch=[10], global=[false])
+      +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[10]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -88,11 +87,10 @@ LogicalSort(offset=[10], fetch=[10])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[20], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
+Limit(offset=[10], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+   +- Limit(offset=[0], fetch=[20], global=[false])
+      +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[20]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -181,11 +179,10 @@ LogicalSort(fetch=[10])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+Limit(offset=[0], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+   +- Limit(offset=[0], fetch=[10], global=[false])
+      +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[10]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -202,11 +199,10 @@ LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+   +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[10], global=[false])
+      +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -305,11 +301,10 @@ LogicalSort(offset=[1], fetch=[10])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[1], fetch=[10], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[11], global=[false])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
+Limit(offset=[1], fetch=[10], global=[true])
++- Exchange(distribution=[single])
+   +- Limit(offset=[0], fetch=[11], global=[false])
+      +- TableSourceScan(table=[[default_catalog, default_database, LimitTable, project=[a, c], limit=[11]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.xml
new file mode 100644
index 0000000..b2c8cc3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+	<TestCase name="testCanPushdownLimitWithoutOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable LIMIT 5]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(fetch=[5])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[5])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 5]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCanPushdownFetchWithOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10], fetch=[10])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 20]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCanPushdownFetchWithoutOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[10])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 10]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCannotPushDownWithOrderBy">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(sort0=[$2], dir0=[ASC-nulls-first], fetch=[10])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCanPushdownLimitWithOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[1], fetch=[10])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable, source: [limit: 11]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCannotPushDownWithoutLimit">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable OFFSET 10]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCannotPushDownWithoutFetch">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.xml
new file mode 100644
index 0000000..ff75d79
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRuleTest.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+	<TestCase name="testCanPushdownLimitWithoutOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable LIMIT 5]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(fetch=[5])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[5])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[5]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCanPushdownFetchWithOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[10], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10], fetch=[10])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[20]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCanPushdownFetchWithoutOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(fetch=[10])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[10]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCannotPushDownWithOrderBy">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable ORDER BY c LIMIT 10]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(sort0=[$2], dir0=[ASC-nulls-first], fetch=[10])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCanPushdownLimitWithOffset">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[1], fetch=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[1], fetch=[10])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable, limit=[11]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCannotPushDownWithoutLimit">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable OFFSET 10]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testCannotPushDownWithoutFetch">
+		<Resource name="sql">
+			<![CDATA[SELECT a, c FROM LimitTable OFFSET 10 ROWS]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalSort(offset=[10])
++- LogicalProject(a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, LimitTable]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- FlinkLogicalSort(offset=[10])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LimitTable]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.scala
similarity index 81%
copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.scala
index 788eb3a..88abf19 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyLimitTest.scala
@@ -18,25 +18,31 @@
 
 package org.apache.flink.table.planner.plan.batch.sql
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{SqlParserException, TableSchema, _}
-import org.apache.flink.table.planner.utils.{TableTestBase, TestLimitableTableSource}
-
-import org.junit.Test
-
-class LimitTest extends TableTestBase {
-
-  private val util = batchTestUtil()
-  util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-  TestLimitableTableSource.createTemporaryTable(
-    util.tableEnv,
-    Seq(),
-    new TableSchema(
-      Array("a", "b", "c"),
-      Array[TypeInformation[_]](INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)),
-    "LimitTable")
+import org.apache.flink.table.api.{SqlParserException, _}
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+class LegacyLimitTest extends TableTestBase {
+
+  protected val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val ddl =
+      s"""
+         |CREATE TABLE LimitTable (
+         |  a int,
+         |  b bigint,
+         |  c string
+         |) WITH (
+         |  'connector.type' = 'TestLimitableTableSource',
+         |  'is-bounded' = 'true'
+         |)
+       """.stripMargin
+    util.tableEnv.executeSql(ddl)
+  }
 
   @Test
   def testLimitWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
index 788eb3a..0e4ab2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
@@ -18,114 +18,36 @@
 
 package org.apache.flink.table.planner.plan.batch.sql
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{SqlParserException, TableSchema, _}
-import org.apache.flink.table.planner.utils.{TableTestBase, TestLimitableTableSource}
-
-import org.junit.Test
-
-class LimitTest extends TableTestBase {
-
-  private val util = batchTestUtil()
-  util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-  TestLimitableTableSource.createTemporaryTable(
-    util.tableEnv,
-    Seq(),
-    new TableSchema(
-      Array("a", "b", "c"),
-      Array[TypeInformation[_]](INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)),
-    "LimitTable")
-
-  @Test
-  def testLimitWithoutOffset(): Unit = {
-    util.verifyPlan("SELECT * FROM MyTable LIMIT 5")
-  }
-
-  @Test
-  def testLimit0WithoutOffset(): Unit = {
-    util.verifyPlan("SELECT * FROM MyTable LIMIT 0")
-  }
-
-  @Test(expected = classOf[SqlParserException])
-  def testNegativeLimitWithoutOffset(): Unit = {
-    util.verifyPlan("SELECT * FROM MyTable LIMIT -1")
-  }
-
-  @Test
-  def testLimitWithOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable LIMIT 10 OFFSET 1")
-  }
-
-  @Test
-  def testLimitWithOffset0(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable LIMIT 10 OFFSET 0")
-  }
-
-  @Test
-  def testLimit0WithOffset0(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable LIMIT 0 OFFSET 0")
-  }
-
-  @Test
-  def testLimit0WithOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable LIMIT 0 OFFSET 10")
-  }
-
-  @Test(expected = classOf[SqlParserException])
-  def testLimitWithNegativeOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable LIMIT 10 OFFSET -1")
-  }
-
-  @Test
-  def testFetchWithOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY")
-  }
-
-  @Test
-  def testFetchWithoutOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable FETCH FIRST 10 ROWS ONLY")
-  }
-
-  @Test
-  def testFetch0WithoutOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable FETCH FIRST 0 ROWS ONLY")
-  }
-
-  @Test
-  def testOnlyOffset(): Unit = {
-    util.verifyPlan("SELECT a, c FROM MyTable OFFSET 10 ROWS")
-  }
-
-  @Test
-  def testFetchWithLimitSource(): Unit = {
-    val sqlQuery = "SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testOrderByWithLimitSource(): Unit = {
-    val sqlQuery = "SELECT a, c FROM LimitTable ORDER BY c LIMIT 10"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testLimitWithLimitSource(): Unit = {
-    val sqlQuery = "SELECT a, c FROM LimitTable LIMIT 10"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testLimitWithOffsetAndLimitSource(): Unit = {
-    val sqlQuery = "SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testFetchWithOffsetAndLimitSource(): Unit = {
-    val sqlQuery = "SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY"
-    util.verifyPlan(sqlQuery)
+import org.apache.flink.table.api._
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown
+
+/**
+ * The plan of following unit test in LimitTest.xml is a bit diffirent from LegacyLimitTest.xml.
+ * Because the TestValuesTableSource has implemented [[SupportsProjectionPushDown]]
+ * while the TestLegacyLimitableTableSource doesn't.
+ * So the Calc has been pushed down to the scan.
+ * 1.testFetchWithOffsetAndLimitSource
+ * 2.testOrderByWithLimitSource
+ * 3.testLimitWithLimitSource
+ * 4.testLimitWithOffsetAndLimitSource
+ * 5.testFetchWithLimitSource
+ */
+class LimitTest extends LegacyLimitTest {
+
+  override def setup(): Unit = {
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val ddl =
+    s"""
+       |CREATE TABLE LimitTable (
+       |  a int,
+       |  b bigint,
+       |  c string
+       |) WITH (
+       |  'connector' = 'values',
+       |  'bounded' = 'true'
+       |)
+       """.stripMargin
+    util.tableEnv.executeSql(ddl)
   }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.scala
new file mode 100644
index 0000000..566fe1b7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRuleTest.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.SortProjectTransposeRule
+import org.apache.calcite.tools.RuleSets
+import org.apache.flink.table.api.SqlParserException
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan, FlinkLogicalSort}
+import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase}
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[PushLimitIntoLegacyTableSourceScanRule]].
+ */
+class PushLimitIntoLegacyTableSourceScanRuleTest extends TableTestBase {
+  protected val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    val calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv.getConfig)
+    calciteConfig.getBatchProgram.get.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(PushLimitIntoLegacyTableSourceScanRule.INSTANCE,
+          SortProjectTransposeRule.INSTANCE,
+          // converts calcite rel(RelNode) to flink rel(FlinkRelNode)
+          FlinkLogicalSort.BATCH_CONVERTER,
+          FlinkLogicalLegacyTableSourceScan.CONVERTER))
+        .build()
+    )
+
+    val ddl =
+      s"""
+         |CREATE TABLE LimitTable (
+         |  a int,
+         |  b bigint,
+         |  c string
+         |) WITH (
+         |  'connector.type' = 'TestLimitableTableSource',
+         |  'is-bounded' = 'true'
+         |)
+       """.stripMargin
+    util.tableEnv.executeSql(ddl)
+  }
+
+  @Test(expected = classOf[SqlParserException])
+  def testLimitWithNegativeOffset(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 10 OFFSET -1")
+  }
+
+  @Test(expected = classOf[SqlParserException])
+  def testNegativeLimitWithoutOffset(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable LIMIT -1")
+  }
+
+  @Test(expected = classOf[SqlParserException])
+  def testMysqlLimit(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 1, 10")
+  }
+
+  @Test
+  def testCanPushdownLimitWithoutOffset(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 5")
+  }
+
+  @Test
+  def testCanPushdownLimitWithOffset(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable LIMIT 10 OFFSET 1")
+  }
+
+  @Test
+  def testCanPushdownFetchWithOffset(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY")
+  }
+
+  @Test
+  def testCanPushdownFetchWithoutOffset(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable FETCH FIRST 10 ROWS ONLY")
+  }
+
+  @Test
+  def testCannotPushDownWithoutLimit(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable OFFSET 10")
+  }
+
+  @Test
+  def testCannotPushDownWithoutFetch(): Unit = {
+    util.verifyPlan("SELECT a, c FROM LimitTable OFFSET 10 ROWS")
+  }
+
+  @Test
+  def testCannotPushDownWithOrderBy(): Unit = {
+    val sqlQuery = "SELECT a, c FROM LimitTable ORDER BY c LIMIT 10"
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
similarity index 92%
copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
index 1f49b67..a6273e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
@@ -19,19 +19,21 @@
 package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
+import org.apache.flink.table.planner.utils.TestLegacyLimitableTableSource
+
 import org.junit._
 
-class LimitITCase extends BatchTestBase {
+class LegacyLimitITCase extends BatchTestBase {
 
   @Before
   override def before(): Unit = {
     super.before()
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
 
-    TestLimitableTableSource.createTemporaryTable(
+    TestLegacyLimitableTableSource.createTemporaryTable(
       tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index 1f49b67..8b87824 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -18,97 +18,28 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.planner.runtime.utils.BatchTestBase
-import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
-import org.junit._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.TestData.{data3, nullablesOfData3, type3}
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
 
-class LimitITCase extends BatchTestBase {
-
-  @Before
+class LimitITCase extends LegacyLimitITCase {
   override def before(): Unit = {
-    super.before()
+    BatchTestBase.configForMiniCluster(conf)
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
 
-    TestLimitableTableSource.createTemporaryTable(
-      tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
-  }
-
-  @Test
-  def testOffsetAndFetch(): Unit = {
-    checkSize(
-      "SELECT * FROM Table3 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY",
-      5)
-  }
-
-  @Test
-  def testOffsetAndLimit(): Unit = {
-    checkSize(
-      "SELECT * FROM Table3 LIMIT 10 OFFSET 2",
-      10)
-  }
-
-  @Test
-  def testFetch(): Unit = {
-    checkSize(
-      "SELECT * FROM Table3 FETCH NEXT 10 ROWS ONLY",
-      10)
-  }
-
-  @Test
-  def testFetchWithLimitTable(): Unit = {
-    checkSize(
-      "SELECT * FROM LimitTable FETCH NEXT 10 ROWS ONLY",
-      10)
-  }
-
-  @Test
-  def testFetchFirst(): Unit = {
-    checkSize(
-      "SELECT * FROM Table3 FETCH FIRST 10 ROWS ONLY",
-      10)
-  }
-
-  @Test
-  def testFetchFirstWithLimitTable(): Unit = {
-    checkSize(
-      "SELECT * FROM LimitTable FETCH FIRST 10 ROWS ONLY",
-      10)
-  }
-
-  @Test
-  def testLimit(): Unit = {
-    checkSize(
-      "SELECT * FROM Table3 LIMIT 5",
-      5)
-  }
-
-  @Test
-  def testLimit0WithLimitTable(): Unit = {
-    checkSize(
-      "SELECT * FROM LimitTable LIMIT 0",
-      0)
-  }
-
-  @Test
-  def testLimitWithLimitTable(): Unit = {
-    checkSize(
-      "SELECT * FROM LimitTable LIMIT 5",
-      5)
-  }
-
-  @Test
-  def testLessThanOffset(): Unit = {
-    checkSize(
-      "SELECT * FROM Table3 OFFSET 2 ROWS FETCH NEXT 50 ROWS ONLY",
-      19)
-  }
-
-  @Test
-  def testLessThanOffsetWithLimitSource(): Unit = {
-    checkSize(
-      "SELECT * FROM LimitTable OFFSET 2 ROWS FETCH NEXT 50 ROWS ONLY",
-      19)
+    val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
+    val ddl =
+      s"""
+         |CREATE TABLE LimitTable (
+         |  a int,
+         |  b bigint,
+         |  c string
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$myTableDataId',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LegacyLimitITCase.scala
similarity index 90%
copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LegacyLimitITCase.scala
index 9562a40..a6e310a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LegacyLimitITCase.scala
@@ -21,17 +21,17 @@ package org.apache.flink.table.planner.runtime.batch.table
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
+import org.apache.flink.table.planner.utils.TestLegacyLimitableTableSource
 import org.junit.Assert.assertEquals
 import org.junit._
 
-class LimitITCase extends BatchTestBase {
+class LegacyLimitITCase extends BatchTestBase {
 
   @Before
   override def before(): Unit = {
     super.before()
 
-    TestLimitableTableSource.createTemporaryTable(
+    TestLegacyLimitableTableSource.createTemporaryTable(
       tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
index 9562a40..ccda349 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
@@ -18,41 +18,29 @@
 
 package org.apache.flink.table.planner.runtime.batch.table
 
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.planner.runtime.utils.BatchTestBase
-import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.table.planner.utils.TestLimitableTableSource
-import org.junit.Assert.assertEquals
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
 import org.junit._
 
-class LimitITCase extends BatchTestBase {
+class LimitITCase extends LegacyLimitITCase {
 
   @Before
   override def before(): Unit = {
-    super.before()
-
-    TestLimitableTableSource.createTemporaryTable(
-      tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes), "LimitTable")
-  }
-
-  @Test
-  def testFetch(): Unit = {
-    assertEquals(
-      executeQuery(tEnv.from("LimitTable").fetch(5)).size,
-      5)
-  }
-
-  @Test
-  def testOffset(): Unit = {
-    assertEquals(
-      executeQuery(tEnv.from("LimitTable").offset(5)).size,
-      16)
-  }
-
-  @Test
-  def testOffsetAndFetch(): Unit = {
-    assertEquals(
-      executeQuery(tEnv.from("LimitTable").limit(5, 5)).size,
-      5)
+    BatchTestBase.configForMiniCluster(conf)
+
+    val myTableDataId = TestValuesTableFactory.registerData(TestData.data3)
+    val ddl =
+      s"""
+         |CREATE TABLE LimitTable (
+         |  a int,
+         |  b bigint,
+         |  c string
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$myTableDataId',
+         |  'bounded' = 'true'
+         |)
+       """.stripMargin
+    tEnv.executeSql(ddl)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLegacyLimitableTableSource.scala
similarity index 94%
rename from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
rename to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLegacyLimitableTableSource.scala
index 81e6e46..211bf55 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLegacyLimitableTableSource.scala
@@ -41,7 +41,7 @@ import scala.collection.JavaConverters._
 /**
   * The table source which support push-down the limit to the source.
   */
-class TestLimitableTableSource(
+class TestLegacyLimitableTableSource(
     data: Seq[Row],
     rowType: RowTypeInfo,
     var limit: Long = -1,
@@ -66,7 +66,7 @@ class TestLimitableTableSource(
   }
 
   override def applyLimit(limit: Long): TableSource[Row] = {
-    new TestLimitableTableSource(data, rowType, limit, limitablePushedDown)
+    new TestLegacyLimitableTableSource(data, rowType, limit, limitablePushedDown)
   }
 
   override def isLimitPushedDown: Boolean = limitablePushedDown
@@ -84,7 +84,7 @@ class TestLimitableTableSource(
   override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(rowType)
 }
 
-object TestLimitableTableSource {
+object TestLegacyLimitableTableSource {
   def createTemporaryTable(
       tEnv: TableEnvironment,
       data: Seq[Row],
@@ -100,7 +100,7 @@ object TestLimitableTableSource {
   }
 }
 
-class TestLimitableTableSourceFactory extends StreamTableSourceFactory[Row] {
+class TestLegacyLimitableTableSourceFactory extends StreamTableSourceFactory[Row] {
   override def createStreamTableSource(
       properties: util.Map[String, String]): StreamTableSource[Row] = {
     val dp = new DescriptorProperties
@@ -117,7 +117,7 @@ class TestLimitableTableSourceFactory extends StreamTableSourceFactory[Row] {
     val limit = dp.getOptionalLong("limit").orElse(-1L)
 
     val limitablePushedDown = dp.getOptionalBoolean("limitable-push-down").orElse(false)
-    new TestLimitableTableSource(
+    new TestLegacyLimitableTableSource(
       data, tableSchema.toRowType.asInstanceOf[RowTypeInfo], limit, limitablePushedDown)
   }