You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/08 08:41:00 UTC

[GitHub] [flink] liuyongvs opened a new pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

liuyongvs opened a new pull request #12851:
URL: https://github.com/apache/flink/pull/12851


   ## What is the purpose of the change
   - make the DynamicSource supports FilterPushDown Rule
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
   - Added PushFilterIntoTableSourceScanRuleTest to verify the plan
   - Extended TableSourceITCase (batch and stream )to verify the result filter projection
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes )
     - If yes, how is the feature documented? (JavaDocs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-659867535


   @KurtYoung ,ok. And i opened a new PR, which @godfreyhe is reviewing now


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655842803


   hi @wuchong , azure passed, could you speed some of time reviewing this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452603891



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452577056



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -562,11 +726,11 @@ public Result applyFilters(List<ResolvedExpression> filters) {
 		private final int expectedNum;
 
 		private TestValuesTableSink(
-				TableSchema schema,
-				String tableName,
-				boolean isInsertOnly,
-				String runtimeSink,
-				int expectedNum) {
+			TableSchema schema,
+			String tableName,
+			boolean isInsertOnly,
+			String runtimeSink,
+			int expectedNum) {

Review comment:
       this indent is not changed by myself. It is automated by reformated by idea when i fire it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452228575



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()

Review comment:
       it's better we can make sure one line is not too long, which is more easy to read.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-656568865


   hi @godfreyhe ,
   because of my careless operation, i drop my github repo. So i open a open PR here https://github.com/apache/flink/pull/12866. And the third commit is modification for all your reviews. I am so Sorry.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330) 
   * 7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4342",
       "triggerID" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "423bccc86580b6a811598fb1dc2f27fce607a1e2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4336",
       "triggerID" : "423bccc86580b6a811598fb1dc2f27fce607a1e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330) 
   * 7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2 UNKNOWN
   * b79337059dea45b43d88cf3e4b07309dcd711d2b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4342) 
   * 423bccc86580b6a811598fb1dc2f27fce607a1e2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452130167



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;

Review comment:
       these line can be simplified as 
   ```
   return tableSourceTable != null && tableSourceTable
   				.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0;
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(

Review comment:
       extends `TableSourceTable#copy` method, and use it here

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);

Review comment:
       why `tableSourceTable.extraDigests().length == 0` is needed ? maybe `PushProjectIntoTableSourceScanRule` is applied before this rule.  we can check whether the `extraDigests` in `tableSourceTable` have contains the digest generated by this rule to avoid endless loop.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+

Review comment:
       redundant line

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].
+ */
+public class PushFilterIntoTableSourceScanRuleTest extends PushFilterIntoLegacyTableSourceScanRuleTest {
+
+	@Override
+	public void setup() {
+		util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+		calciteConfig.getBatchProgram().get().addLast(
+			"rules",
+			FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+				.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+				.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+				.add(RuleSets.ofList(PushFilterIntoTableSourceScanRule.INSTANCE,
+					FilterProjectTransposeRule.INSTANCE))
+				.build()
+		);
+		// name: STRING, id: LONG, amount: INT, price: DOUBLE
+		String ddl1 =
+			"CREATE TABLE MyTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+		util().tableEnv().executeSql(ddl1);
+
+		String ddl2 =
+			"CREATE TABLE VirtualTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  virtualField as amount + 1,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+
+		util().tableEnv().executeSql(ddl2);
+	}
+
+	@Override
+	public void testLowerUpperPushdown() {
+		String ddl3 =

Review comment:
       `ddl3` -> `ddl`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);
+
+		// check whether framework still need to do a filter
+		if (result.getRemainingFilters().isEmpty() && unconvertedRexNodes.length == 0) {
+			call.transformTo(newScan);
+		} else {
+			relBuilder.push(scan);
+			ExpressionConverter converter = new ExpressionConverter(relBuilder);
+			List<RexNode> remainingConditions = result.getRemainingFilters().stream().map(e -> e.accept(converter)).collect(Collectors.toList());
+			remainingConditions.addAll(Arrays.asList(unconvertedRexNodes));
+			RexNode remainingCondition = remainingConditions.stream().reduce((l, r) -> relBuilder.and(l, r)).get();

Review comment:
       this line can be simplified as `RexNode remainingCondition = relBuilder.and(remainingConditions);`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(

Review comment:
       It's better we can split this method to a few small methods. 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {
+			if (expr instanceof ValueLiteralExpression) {
+				Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass());
+				if (value.isPresent()) {
+					return (Comparable) value.get();
+				} else {
+					return null;
+				}

Review comment:
       `return (Comparable) value.orElse(null);`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].

Review comment:
        `[[LogicalTableScan]]` -> `{@link LogicalTableScan}`. add more comments like `which table is a {@link TableSourceTable} and the table source in the table is a {@link SupportsFilterPushDown}.` to explain more  

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));

Review comment:
       give a more meaningful name `pushDownResult` ?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].

Review comment:
       `[[PushFilterIntoTableSourceScanRule]]` -> `{@link PushFilterIntoTableSourceScanRule}`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests

Review comment:
       nit: indent

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable

Review comment:
       revert this change ? or just put `@Nullable` in a single line, like
   ```
   @Nullable
   private final String lookupFunctionClass;
   ```
   

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);

Review comment:
       use `LogicalTableScan.create`

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable
+		String lookupFunctionClass;
 		private final boolean nestedProjectionSupported;
-		private @Nullable int[] projectedFields;
+		private @Nullable

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -460,34 +620,38 @@ public DynamicTableSource copy() {
 				isAsync,
 				lookupFunctionClass,
 				nestedProjectionSupported,
-				projectedFields);
+				projectedFields,
+				filterPredicates,
+				filterableFields);
 		}
 
 		@Override
 		public String asSummaryString() {
 			return "TestValues";
 		}
 
-		private static Collection<RowData> convertToRowData(
-				Collection<Row> data,
-				int[] projectedFields,
-				DataStructureConverter converter) {
+		private Collection<RowData> convertToRowData(
+			Collection<Row> data,
+			int[] projectedFields,
+			DataStructureConverter converter) {
 			List<RowData> result = new ArrayList<>();
 			for (Row value : data) {
-				Row projectedRow;
-				if (projectedFields == null) {
-					projectedRow = value;
-				} else {
-					Object[] newValues = new Object[projectedFields.length];
-					for (int i = 0; i < projectedFields.length; ++i) {
-						newValues[i] = value.getField(projectedFields[i]);
+				if (applyPredicatesToRow(value)) {

Review comment:
       change to 
   ```
   if (!applyPredicatesToRow(value)) {
       continue;;
   }
   ```
   this could make the following block more readable
   

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {

Review comment:
       use `Comparable<?>` to make IDE happy ? 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -240,6 +264,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
 		String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
 		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
+		Optional<List<String>> fields = helper.getOptions().getOptional(FILTERABLE_FIELDS);

Review comment:
       nit: `fields` is too generic

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -562,11 +726,11 @@ public Result applyFilters(List<ResolvedExpression> filters) {
 		private final int expectedNum;
 
 		private TestValuesTableSink(
-				TableSchema schema,
-				String tableName,
-				boolean isInsertOnly,
-				String runtimeSink,
-				int expectedNum) {
+			TableSchema schema,
+			String tableName,
+			boolean isInsertOnly,
+			String runtimeSink,
+			int expectedNum) {

Review comment:
       It seems the indent is not correct, revert the changes

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);

Review comment:
       we should limit the supported type of filter fields in `applyFilters` method, otherwise we can't extract the value to `Comparable` directly. Some types does not `Comparable`.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable
+		String lookupFunctionClass;
 		private final boolean nestedProjectionSupported;
-		private @Nullable int[] projectedFields;
+		private @Nullable
+		int[] projectedFields;
+		private List<ResolvedExpression> filterPredicates;
+		private Set<String> filterableFields;

Review comment:
       add `final` 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {
+			if (expr instanceof ValueLiteralExpression) {
+				Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass());
+				if (value.isPresent()) {
+					return (Comparable) value.get();
+				} else {
+					return null;
+				}
+			}
+
+			if (expr instanceof FieldReferenceExpression) {
+				RowTypeInfo rowTypeInfo = new RowTypeInfo(TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataTypes()), physicalSchema.getFieldNames());
+				int idx = rowTypeInfo.getFieldIndex(((FieldReferenceExpression) expr).getName());

Review comment:
       find the `index` through `physicalSchema.getFieldNames()` directly ?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {

Review comment:
       this method is unnecessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452581704



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);

Review comment:
       yes , it is for  avoiding endless loop.  i will change it .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655853748


   Hi @godfreyhe , do you have time to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452604095



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452591843



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {
+			if (expr instanceof ValueLiteralExpression) {
+				Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass());
+				if (value.isPresent()) {
+					return (Comparable) value.get();
+				} else {
+					return null;
+				}
+			}
+
+			if (expr instanceof FieldReferenceExpression) {
+				RowTypeInfo rowTypeInfo = new RowTypeInfo(TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataTypes()), physicalSchema.getFieldNames());
+				int idx = rowTypeInfo.getFieldIndex(((FieldReferenceExpression) expr).getName());

Review comment:
       ok, changed to 				
   int idx = Arrays.asList(physicalSchema.getFieldNames()).indexOf(((FieldReferenceExpression) expr).getName()); 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452594080



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -460,34 +620,38 @@ public DynamicTableSource copy() {
 				isAsync,
 				lookupFunctionClass,
 				nestedProjectionSupported,
-				projectedFields);
+				projectedFields,
+				filterPredicates,
+				filterableFields);
 		}
 
 		@Override
 		public String asSummaryString() {
 			return "TestValues";
 		}
 
-		private static Collection<RowData> convertToRowData(
-				Collection<Row> data,
-				int[] projectedFields,
-				DataStructureConverter converter) {
+		private Collection<RowData> convertToRowData(
+			Collection<Row> data,
+			int[] projectedFields,
+			DataStructureConverter converter) {
 			List<RowData> result = new ArrayList<>();
 			for (Row value : data) {
-				Row projectedRow;
-				if (projectedFields == null) {
-					projectedRow = value;
-				} else {
-					Object[] newValues = new Object[projectedFields.length];
-					for (int i = 0; i < projectedFields.length; ++i) {
-						newValues[i] = value.getField(projectedFields[i]);
+				if (applyPredicatesToRow(value)) {

Review comment:
       thanks . i  will change the function name to isRetainedAfterApplyingFilterPredicates make it more readable

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -333,28 +364,34 @@ private ChangelogMode parseChangelogMode(String string) {
 	/**
 	 * Values {@link DynamicTableSource} for testing.
 	 */
-	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
+	private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown {
 
 		private TableSchema physicalSchema;
 		private final ChangelogMode changelogMode;
 		private final boolean bounded;
 		private final String runtimeSource;
 		private final Collection<Row> data;
 		private final boolean isAsync;
-		private final @Nullable String lookupFunctionClass;
+		private final @Nullable
+		String lookupFunctionClass;
 		private final boolean nestedProjectionSupported;
-		private @Nullable int[] projectedFields;
+		private @Nullable
+		int[] projectedFields;
+		private List<ResolvedExpression> filterPredicates;
+		private Set<String> filterableFields;

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452597228



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452579844



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+

Review comment:
       why it is redundant ?  if the local variable is not used , it will make the code not readable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452584302



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+

Review comment:
       This line is empty line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452604505



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);

Review comment:
       the code here , which i just refer to TestFilterableSource. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452586065



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].

Review comment:
       thanks. [[]] is for scala 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452582266



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);
+			Comparable lhsValue = tuple2._1;
+			Comparable rhsValue = tuple2._2;
+			FunctionDefinition functionDefinition = binExpr.getFunctionDefinition();
+
+			if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) > 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) < 0;
+			} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) >= 0;
+			} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) <= 0;
+			} else if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) == 0;
+			} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+				return lhsValue.compareTo(rhsValue) != 0;
+			} else {
+				return false;
+			}
+		}
+
+		private Tuple2<Comparable, Comparable> extractValues(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			return new Tuple2(getValue(children.get(0), row), getValue(children.get(1), row));
+		}
+
+		private Comparable getValue(Expression expr, Row row) {
+			if (expr instanceof ValueLiteralExpression) {
+				Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass());
+				if (value.isPresent()) {
+					return (Comparable) value.get();
+				} else {
+					return null;
+				}

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452580165



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;

Review comment:
       ok thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452580515



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);
+
+		// check whether framework still need to do a filter
+		if (result.getRemainingFilters().isEmpty() && unconvertedRexNodes.length == 0) {
+			call.transformTo(newScan);
+		} else {
+			relBuilder.push(scan);
+			ExpressionConverter converter = new ExpressionConverter(relBuilder);
+			List<RexNode> remainingConditions = result.getRemainingFilters().stream().map(e -> e.accept(converter)).collect(Collectors.toList());
+			remainingConditions.addAll(Arrays.asList(unconvertedRexNodes));
+			RexNode remainingCondition = remainingConditions.stream().reduce((l, r) -> relBuilder.and(l, r)).get();

Review comment:
       thanks 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452580785



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].
+ */
+public class PushFilterIntoTableSourceScanRuleTest extends PushFilterIntoLegacyTableSourceScanRuleTest {
+
+	@Override
+	public void setup() {
+		util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+		calciteConfig.getBatchProgram().get().addLast(
+			"rules",
+			FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+				.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+				.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+				.add(RuleSets.ofList(PushFilterIntoTableSourceScanRule.INSTANCE,
+					FilterProjectTransposeRule.INSTANCE))
+				.build()
+		);
+		// name: STRING, id: LONG, amount: INT, price: DOUBLE
+		String ddl1 =
+			"CREATE TABLE MyTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+		util().tableEnv().executeSql(ddl1);
+
+		String ddl2 =
+			"CREATE TABLE VirtualTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  virtualField as amount + 1,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+
+		util().tableEnv().executeSql(ddl2);
+	}
+
+	@Override
+	public void testLowerUpperPushdown() {
+		String ddl3 =

Review comment:
       + 1

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for [[PushFilterIntoTableSourceScanRule]].
+ */
+public class PushFilterIntoTableSourceScanRuleTest extends PushFilterIntoLegacyTableSourceScanRuleTest {
+
+	@Override
+	public void setup() {
+		util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+		CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+		calciteConfig.getBatchProgram().get().addLast(
+			"rules",
+			FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+				.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+				.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+				.add(RuleSets.ofList(PushFilterIntoTableSourceScanRule.INSTANCE,
+					FilterProjectTransposeRule.INSTANCE))
+				.build()
+		);
+		// name: STRING, id: LONG, amount: INT, price: DOUBLE
+		String ddl1 =
+			"CREATE TABLE MyTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+		util().tableEnv().executeSql(ddl1);
+
+		String ddl2 =
+			"CREATE TABLE VirtualTable (\n" +
+				"  name STRING,\n" +
+				"  id bigint,\n" +
+				"  amount int,\n" +
+				"  virtualField as amount + 1,\n" +
+				"  price double\n" +
+				") WITH (\n" +
+				" 'connector' = 'values',\n" +
+				" 'filterable-fields' = 'amount',\n" +
+				" 'bounded' = 'true'\n" +
+				")";
+
+		util().tableEnv().executeSql(ddl2);
+	}
+
+	@Override
+	public void testLowerUpperPushdown() {
+		String ddl3 =

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452603535



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-656568865


   hi @godfreyhe ,
   because of my careless operation, i drop my github repo. So i open a open PR here[]( https://github.com/apache/flink/pull/12866). And the third commit is modification for all your reviews.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655380026


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 7efb346a9767797762801f29a4d5ac0843f123f3 (Wed Jul 08 08:43:17 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] KurtYoung commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
KurtYoung commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-659785168


   @liuyongvs could you close this then?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452585081



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);

Review comment:
       why ? it doesn't need




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4342",
       "triggerID" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "423bccc86580b6a811598fb1dc2f27fce607a1e2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4336",
       "triggerID" : "423bccc86580b6a811598fb1dc2f27fce607a1e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2 UNKNOWN
   * b79337059dea45b43d88cf3e4b07309dcd711d2b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4342) 
   * 423bccc86580b6a811598fb1dc2f27fce607a1e2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4336) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330) 
   * 7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2 UNKNOWN
   * b79337059dea45b43d88cf3e4b07309dcd711d2b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452582119



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);

Review comment:
       > @liuyongvs Thanks for the contribution, I left some comments, please add some more test in `TableSourceTest`
   
   I reuse the TableSourceTest tests. So if the logic is ok before, so do i




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452585081



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+
+/**
+ * Planner rule that tries to push a filter into a [[LogicalTableScan]].
+ */
+public class PushFilterIntoTableSourceScanRule extends RelOptRule {
+	public static final PushFilterIntoTableSourceScanRule INSTANCE = new PushFilterIntoTableSourceScanRule();
+
+	public PushFilterIntoTableSourceScanRule() {
+		super(operand(Filter.class,
+			operand(LogicalTableScan.class, none())),
+			"PushFilterIntoTableSourceScanRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		TableConfig config = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
+		if (!config.getConfiguration().getBoolean(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
+			return false;
+		}
+
+		Filter filter = call.rel(0);
+		if (filter.getCondition() == null) {
+			return false;
+		}
+
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+		if (tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsFilterPushDown && tableSourceTable.extraDigests().length == 0) {
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void onMatch(RelOptRuleCall call) {
+		Filter filter = call.rel(0);
+		LogicalTableScan scan = call.rel(1);
+		TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+		pushFilterIntoScan(call, filter, scan, table);
+
+	}
+
+	private void pushFilterIntoScan(
+		RelOptRuleCall call,
+		Filter filter,
+		LogicalTableScan scan,
+		FlinkPreparingTableBase relOptTable) {
+
+		RelBuilder relBuilder = call.builder();
+		FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class);
+		int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
+		Tuple2<Expression[], RexNode[]> tuple2 =
+			RexNodeExtractor.extractConjunctiveConditions(
+				filter.getCondition(),
+				maxCnfNodeCount,
+				filter.getInput().getRowType().getFieldNames(),
+				relBuilder.getRexBuilder(),
+				context.getFunctionCatalog(),
+				context.getCatalogManager(),
+				TimeZone.getTimeZone(scan.getCluster().getPlanner().getContext()
+					.unwrap(FlinkContext.class).getTableConfig().getLocalTimeZone()));
+		Expression[] predicates = tuple2._1;
+		RexNode[] unconvertedRexNodes = tuple2._2;
+		if (predicates.length == 0) {
+			// no condition can be translated to expression
+			return;
+		}
+
+		List<Expression> remainingPredicates = new LinkedList<>();
+		remainingPredicates.addAll(Arrays.asList(predicates));
+		//record size before applyFilters for update statistics
+		int originPredicatesSize = remainingPredicates.size();
+
+		//Update DynamicTableSource
+		TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
+		DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			context.getTableConfig(),
+			name -> Optional.empty(),
+			context.getFunctionCatalog().asLookup(str -> {
+				throw new TableException("We should not need to lookup any expressions at this point");
+			}),
+			context.getCatalogManager().getDataTypeFactory())
+			.build();
+		SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates));
+
+		//Update statistics
+		FlinkStatistic oldStatistic = oldTableSourceTable.getStatistic();
+		FlinkStatistic newStatistic = null;
+		//record size after applyFilters for update statistics
+		int updatedPredicatesSize = result.getRemainingFilters().size();
+		if (originPredicatesSize == updatedPredicatesSize) {
+			// Keep all Statistics if no predicates can be pushed down
+			newStatistic = oldStatistic;
+		} else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
+			newStatistic = oldStatistic;
+		} else {
+			// Remove tableStats after predicates pushed down
+			newStatistic = FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
+		}
+
+		//Update extraDigests
+		String[] oldExtraDigests = oldTableSourceTable.extraDigests();
+		String[] newExtraDigests = null;
+		if (!result.getAcceptedFilters().isEmpty()) {
+			String extraDigests = "filter=["
+				+ result.getAcceptedFilters().stream().reduce((l, r) -> new CallExpression(AND, Arrays.asList(l, r), DataTypes.BOOLEAN())).get().toString()
+				+ "]";
+			newExtraDigests = Stream.concat(Arrays.stream(oldExtraDigests), Arrays.stream(new String[]{extraDigests})).toArray(String[]::new);
+		} else {
+			newExtraDigests = oldExtraDigests;
+		}
+			//set the newStatistic newTableSource and extraDigests
+		TableSourceTable newTableSourceTable = new TableSourceTable(
+			oldTableSourceTable.getRelOptSchema(),
+			oldTableSourceTable.tableIdentifier(),
+			oldTableSourceTable.getRowType(),
+			newStatistic,
+			newTableSource,
+			oldTableSourceTable.isStreamingMode(),
+			oldTableSourceTable.catalogTable(),
+			oldTableSourceTable.dynamicOptions(),
+			newExtraDigests
+		);
+		TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable);

Review comment:
       why ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655384097


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330",
       "triggerID" : "7efb346a9767797762801f29a4d5ac0843f123f3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b79337059dea45b43d88cf3e4b07309dcd711d2b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "423bccc86580b6a811598fb1dc2f27fce607a1e2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "423bccc86580b6a811598fb1dc2f27fce607a1e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7efb346a9767797762801f29a4d5ac0843f123f3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4330) 
   * 7a1f2dd7590a3b1816e619db2c80d543cbdcf7d2 UNKNOWN
   * b79337059dea45b43d88cf3e4b07309dcd711d2b UNKNOWN
   * 423bccc86580b6a811598fb1dc2f27fce607a1e2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452219045



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
 			this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
 		}
 
+		@Override
+		public Result applyFilters(List<ResolvedExpression> filters) {
+			List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+			List<ResolvedExpression> remainingFilters = new ArrayList<>();
+			for (ResolvedExpression expr : filters) {
+				if (shouldPushDown(expr)) {
+					acceptedFilters.add(expr);
+				} else {
+					remainingFilters.add(expr);
+				}
+			}
+			this.filterPredicates = acceptedFilters;
+			return Result.of(acceptedFilters, remainingFilters);
+		}
+
+		private Boolean shouldPushDown(Expression expr) {
+			if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+				return shouldPushDownUnaryExpression(expr.getChildren().get(0))
+					&& shouldPushDownUnaryExpression(expr.getChildren().get(1));
+			}
+			return false;
+		}
+
+		private boolean shouldPushDownUnaryExpression(Expression expr) {
+			if (expr instanceof FieldReferenceExpression) {
+				if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+					return true;
+				}
+			}
+
+			if (expr instanceof ValueLiteralExpression) {
+				return true;
+			}
+
+			if (expr instanceof CallExpression && expr.getChildren().size() == 1) {
+				if (((CallExpression) expr).getFunctionDefinition().equals(UPPER)
+					|| ((CallExpression) expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+					return shouldPushDownUnaryExpression(expr.getChildren().get(0));
+				}
+			}
+			// other resolved expressions return false
+			return false;
+		}
+
+		private Boolean applyPredicatesToRow(Row row) {
+			if (filterPredicates == null) {
+				return true;
+			}
+			for (ResolvedExpression expr : filterPredicates) {
+				if (expr instanceof CallExpression && expr.getChildren().size() == 2) {
+					if (!binaryFilterApplies((CallExpression) expr, row)) {
+						return false;
+					}
+				} else {
+					throw new RuntimeException(expr + " not supported!");
+				}
+			}
+			return true;
+		}
+
+		private boolean binaryFilterApplies(CallExpression binExpr, Row row) {
+			List<Expression> children = binExpr.getChildren();
+			Preconditions.checkArgument(children.size() == 2);
+			Tuple2<Comparable, Comparable> tuple2 = extractValues(binExpr, row);

Review comment:
       we should limit the supported type of filter fields in `applyFilters` method, otherwise we can't extract the value to `Comparable` directly. Some types are not `Comparable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs closed pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs closed pull request #12851:
URL: https://github.com/apache/flink/pull/12851


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] liuyongvs commented on a change in pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452580883



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -240,6 +264,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
 		String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
 		boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
+		Optional<List<String>> fields = helper.getOptions().getOptional(FILTERABLE_FIELDS);

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #12851: [FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource.

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #12851:
URL: https://github.com/apache/flink/pull/12851#issuecomment-655857964


   @liuyongvs @wuchong I will review this today


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org