You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/09 13:37:27 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452130167



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;

Review comment:
       these line can be simplified as 
   ```
   return tableSourceTable != null && tableSourceTable
   				.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0;
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(

Review comment:
       extends `TableSourceTable#copy` method, and use it here

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);

Review comment:
       why `tableSourceTable.extraDigests().length == 0` is needed ? maybe `PushProjectIntoTableSourceScanRule` is applied before this rule.  we can check whether the `extraDigests` in `tableSourceTable` have contains the digest generated by this rule to avoid endless loop.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+

Review comment:
       redundant line

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].
+ */
+public class PushFilterIntoTableSourceScanRuleTest extends PushFilterIntoLegacyTableSourceScanRuleTest {
+
+	@Override
+	public void setup() {
+		util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+		calciteConfig.getBatchProgram().get().addLast(
+			"rules",
+			FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+				.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+				.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+				.add(RuleSets.ofList(PushFilterIntoTableSourceScanRule.INSTANCE,
+					FilterProjectTransposeRule.INSTANCE))
+				.build()
+		);
+		// name: STRING, id: LONG, amount: INT, price: DOUBLE
+		String ddl1 =
+			"CREATE TABLE MyTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+		util().tableEnv().executeSql(ddl1);
+
+		String ddl2 =
+			"CREATE TABLE VirtualTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  virtualField as amount + 1,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+
+		util().tableEnv().executeSql(ddl2);
+	}
+
+	@Override
+	public void testLowerUpperPushdown() {
+		String ddl3 =

Review comment:
       `ddl3` -> `ddl`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);
+
+		// check whether framework still need to do a filter
+		if (result.getRemainingFilters().isEmpty() && unconvertedRexNodes.length == 0) {
+			call.transformTo(newScan);
+		} else {
+			relBuilder.push(scan);
+			ExpressionConverter converter = new ExpressionConverter(relBuilder);
+			List<RexNode> remainingConditions = result.getRemainingFilters().stream().map(e -> e.accept(converter)).collect(Collectors.toList());
+			remainingConditions.addAll(Arrays.asList(unconvertedRexNodes));
+			RexNode remainingCondition = remainingConditions.stream().reduce((l, r) -> relBuilder.and(l, r)).get();

Review comment:
       this line can be simplified as `RexNode remainingCondition = relBuilder.and(remainingConditions);`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(

Review comment:
       It's better we can split this method to a few small methods. 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {
+			if (expr instanceof ValueLiteralExpression) {
+				Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass());
+				if (value.isPresent()) {
+					return (Comparable) value.get();
+				} else {
+					return null;
+				}

Review comment:
       `return (Comparable) value.orElse(null);`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].

Review comment:
        `[[LogicalTableScan]]` -> `{@link LogicalTableScan}`. add more comments like `which table is a {@link TableSourceTable} and the table source in the table is a {@link SupportsFilterPushDown}.` to explain more  

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));

Review comment:
       give a more meaningful name `pushDownResult` ?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].

Review comment:
       `[[PushFilterIntoTableSourceScanRule]]` -> `{@link PushFilterIntoTableSourceScanRule}`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests

Review comment:
       nit: indent

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable

Review comment:
       revert this change ? or just put `@Nullable` in a single line, like
   ```
   @Nullable
   private final String lookupFunctionClass;
   ```
   

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);

Review comment:
       use `LogicalTableScan.create`

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable
+		String lookupFunctionClass;
 		private final boolean nestedProjectionSupported;
-		private @Nullable int[] projectedFields;
+		private @Nullable

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -460,34 +620,38 @@ public DynamicTableSource copy() {
 				isAsync,
 				lookupFunctionClass,
 				nestedProjectionSupported,
-				projectedFields);
+				projectedFields,
+				filterPredicates,
+				filterableFields);
 		}
 
 		@Override
 		public String asSummaryString() {
 			return "TestValues";
 		}
 
-		private static Collection<RowData> convertToRowData(
-				Collection<Row> data,
-				int[] projectedFields,
-				DataStructureConverter converter) {
+		private Collection<RowData> convertToRowData(
+			Collection<Row> data,
+			int[] projectedFields,
+			DataStructureConverter converter) {
 			List<RowData> result = new ArrayList<>();
 			for (Row value : data) {
-				Row projectedRow;
-				if (projectedFields == null) {
-					projectedRow = value;
-				} else {
-					Object[] newValues = new Object[projectedFields.length];
-					for (int i = 0; i < projectedFields.length; ++i) {
-						newValues[i] = value.getField(projectedFields[i]);
+				if (applyPredicatesToRow(value)) {

Review comment:
       change to 
   ```
   if (!applyPredicatesToRow(value)) {
       continue;;
   }
   ```
   this could make the following block more readable
   

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {

Review comment:
       use `Comparable<?>` to make IDE happy ? 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -240,6 +264,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
 		String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
 		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
+		Optional<List<String>> fields = helper.getOptions().getOptional(FILTERABLE_FIELDS);

Review comment:
       nit: `fields` is too generic

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -562,11 +726,11 @@ public Result applyFilters(List<ResolvedExpression> filters) {
 		private final int expectedNum;
 
 		private TestValuesTableSink(
-				TableSchema schema,
-				String tableName,
-				boolean isInsertOnly,
-				String runtimeSink,
-				int expectedNum) {
+			TableSchema schema,
+			String tableName,
+			boolean isInsertOnly,
+			String runtimeSink,
+			int expectedNum) {

Review comment:
       It seems the indent is not correct, revert the changes

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);

Review comment:
       we should limit the supported type of filter fields in `applyFilters` method, otherwise we can't extract the value to `Comparable` directly. Some types does not `Comparable`.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable
+		String lookupFunctionClass;
 		private final boolean nestedProjectionSupported;
-		private @Nullable int[] projectedFields;
+		private @Nullable
+		int[] projectedFields;
+		private List<ResolvedExpression> filterPredicates;
+		private Set<String> filterableFields;

Review comment:
       add `final` 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {
+			if (expr instanceof ValueLiteralExpression) {
+				Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass());
+				if (value.isPresent()) {
+					return (Comparable) value.get();
+				} else {
+					return null;
+				}
+			}
+
+			if (expr instanceof FieldReferenceExpression) {
+				RowTypeInfo rowTypeInfo = new RowTypeInfo(TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataTypes()), physicalSchema.getFieldNames());
+				int idx = rowTypeInfo.getFieldIndex(((FieldReferenceExpression) expr).getName());

Review comment:
       find the `index` through `physicalSchema.getFieldNames()` directly ?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {

Review comment:
       this method is unnecessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org