You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/03 08:45:40 UTC

[GitHub] [flink] qingwei91 opened a new pull request, #20140: Flink 16024/1.16 jdbc filter

qingwei91 opened a new pull request, #20140:
URL: https://github.com/apache/flink/pull/20140

   ## What is the purpose of the change
   
   Implement Filter Pushdown in JDBC Connector Source, this is an optimization that will avoid scanning the whole table into Flink when we only need a subset of the table.
   
   ## Brief change log
   
   * Implement a SQL Visitor that converts Flink SQL AST to JDBC SQL String
   * Implement FilterPushdown in JDBCDynamicSource using the Visitor
   * Add tests for both Visitor and JDBCDynamicSource
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   * Added Integrated Test in JdbcDynamicTableSourceITCase to make sure there is no regression on SQL Filtering
   * Added JdbcFilterPushdownVisitorTest to verify we are generating SQL correctly
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1184077632

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1297518207

   @libenchao 
   
   > Thanks for the update. Please do not use 'squash' or 'force-push' unless you must or the reviewer asks you. (I go through all the codes again, and left several minor comment)
   
   Sorry, in the last commit, I thought to bundle all changes I've made since your last review into https://github.com/apache/flink/pull/20140/commits/55d5227fe040890f0c89e366a5ae19c6c75c1d21, so that you can review just that without noise. I wont do it again.
   
   I will go through the comments and fix them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r925920284


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Hi @JingGe , I believe it has to be PublicEvolving because this class is returned by a public method in JdbcDialect, which is a PublicEvolving class.
   
   I will sort out the commit, sorry for the confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20140: Flink 16024/1.16 jdbc filter

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1173040184

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4780c64c8a7bafdd767f21190fca1b11ad9dabb0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4780c64c8a7bafdd767f21190fca1b11ad9dabb0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4780c64c8a7bafdd767f21190fca1b11ad9dabb0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1199074023

   @qingwei91 I can't review this since this is not my expertise. I would like to ask @hailin0 to help with a review due to https://github.com/apache/flink/pull/20304 - Else I would like to ask @leonardBang if he can help out


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296222402

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    private static final Set<Class<?>> SUPPORTED_DATA_TYPES;
+
+    static {
+        SUPPORTED_DATA_TYPES = new HashSet<>();
+        SUPPORTED_DATA_TYPES.add(IntType.class);
+        SUPPORTED_DATA_TYPES.add(BigIntType.class);
+        SUPPORTED_DATA_TYPES.add(BooleanType.class);
+        SUPPORTED_DATA_TYPES.add(DecimalType.class);
+        SUPPORTED_DATA_TYPES.add(DoubleType.class);
+        SUPPORTED_DATA_TYPES.add(FloatType.class);
+        SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+        SUPPORTED_DATA_TYPES.add(VarCharType.class);
+        SUPPORTED_DATA_TYPES.add(TimestampType.class);
+        SUPPORTED_DATA_TYPES.add(DateType.class);
+        SUPPORTED_DATA_TYPES.add(TimeType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Class<?> typeCs = tpe.getClass();
+
+        if (SUPPORTED_DATA_TYPES.contains(typeCs)) {

Review Comment:
   Great idea! I didnt discover this typeRoot thingy earlier, thanks for the suggestion.
   
   https://github.com/apache/flink/pull/20140/commits/dd54aedb1a71eb66fc37857c86a0c8e32ff8e4b6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1183432699

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r924911685


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   > Do you mean we cannot expose new API?
   
   You can definitely add a new API to the `master` branch, but the reason why it's problematic here is that you've changed the status of the API in your different commits. That means that when a reviewer is checking individual commits, it shows that you've removed `@Public`. In a later commit, you've again added `@PublicEvolving`. It's better to squash your commits in this case to avoid this type of confusion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r928095964


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   I've squash the commits now and we have a green build.
   
   Can anyone of you review this again? Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1011302797


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -93,39 +99,74 @@ private Optional<ParameterizedPredicate> renderBinaryOperator(
                 left -> rightOperandString.map(right -> left.combine(operator, right)));
     }
 
+    private Optional<ParameterizedPredicate> renderUnaryOperator(
+            String operator, ResolvedExpression operand, Boolean operandOnLeft) {

Review Comment:
   Thanks, committed 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1275195142

   Hi @libenchao I finally got around and implemented your suggestion. Do you mind to take a look again?
   
   I didn't implement additional operators like IN and BETWEEN because I am quite busy of late. Given they can be implemented later incrementally, and its not going to break anything, I wonder if we can defer those to a different ticket?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
JingGe commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r921863300


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   It is not allowed to change public interface. Please check [FLIP-196: Source API stability guarantees](https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%3A+Source+API+stability+guarantees) for more information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1289455651

   Hi @libenchao , thank you very much for your review 👍 
   
   I've addressed all of your concern. On the TablePlanTest, do you mind to check if that's how it supposed to work? I don't think I understand internal good enough to judge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1009804824


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Serializable[] params = new Serializable[1];
+
+        ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+        switch (tpe.getTypeRoot()) {

Review Comment:
   Yes, will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1298952830

   Hi @libenchao , this is the new commit I added to address your comment.
   
   I also added support IS NULL and IS NOT NULL as these 2 are quite common


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1288133847

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008502626


##########
flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml:
##########
@@ -51,4 +51,22 @@ TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decima
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFilterPushdown">
+	<Resource name="sql">
+		<![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> '11:11:11.000111' OR double_col >= -1000.23]]>
+	</Resource>
+	<Resource name="ast">
+		<![CDATA[
+LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, _UTF-16LE'11:11:11.000111')), >=($5, -1000.23:DECIMAL(6, 2)))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+	</Resource>
+	<Resource name="optimized exec plan">
+		<![CDATA[
+Calc(select=[id, time_col, real_col], where=[((time_col <> '11:11:11.000111') OR (double_col >= -1000.23))])

Review Comment:
   I figure out, its because the column type Time wasn't supported, I've added support here: https://github.com/apache/flink/pull/20140/commits/74fe5eed158482920a9ec36ac8a7aad549ee94fa
   
   Thanks for calling it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1295483171

   > One small tip, do not rebase/force-pushing before the reviewer asks you because this will make the reviewer hard to do incremental review.
   
   Sorry, my bad, I was advised to squashed in prev PR, but of course that should only be done right before merging. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] grzegorz8 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by "grzegorz8 (via GitHub)" <gi...@apache.org>.
grzegorz8 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1113508926


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private final Function<String, String> quoteIdentifierFunction;
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {

Review Comment:
   @qingwei91 - Is there any reason why LIKE was not added here? It seems to be as easy as other binary operators below.
   I will gladly add LIKE but I just want to ensure I didn't overlook something tricky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1209099214

   Hi @libenchao, oh I see, I didnt spot that `JdbcRowDataInputFormat` is using PreparedStatement under the hood. 
   
   Thanks for pointing it out, this will be larger change than I expect, I will give it a go.
   
   > 1. What functions should we support, e.g. IN, BETWEEN?
   
   I think we can tackle this incrementally? I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR, I never looked into BETWEEN though
   
   > 2. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?
   
   I think ultimate we need to allow dialect specificness, right now I design it such that the query generator (ie. JdbcFilterPushdownVisitor) is part of JDBCDialect, so each dialect can provide their own instance to deal with it. Do you think this design is okay? Or is there a better way?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
JingGe commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r924934341


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Hi @qingwei91 Could you please explain why a public API is required? Does @Internal work too? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    private static final Set<Class<?>> SUPPORTED_DATA_TYPES;
+
+    static {
+        SUPPORTED_DATA_TYPES = new HashSet<>();
+        SUPPORTED_DATA_TYPES.add(IntType.class);
+        SUPPORTED_DATA_TYPES.add(BigIntType.class);
+        SUPPORTED_DATA_TYPES.add(BooleanType.class);
+        SUPPORTED_DATA_TYPES.add(DecimalType.class);
+        SUPPORTED_DATA_TYPES.add(DoubleType.class);
+        SUPPORTED_DATA_TYPES.add(FloatType.class);
+        SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+        SUPPORTED_DATA_TYPES.add(VarCharType.class);
+        SUPPORTED_DATA_TYPES.add(TimestampType.class);
+        SUPPORTED_DATA_TYPES.add(DateType.class);
+        SUPPORTED_DATA_TYPES.add(TimeType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Class<?> typeCs = tpe.getClass();
+
+        if (SUPPORTED_DATA_TYPES.contains(typeCs)) {

Review Comment:
   Great idea! I didnt discover this typeRoot thingy earlier, thanks for the suggestion.
   
   https://github.com/apache/flink/pull/20140/commits/d9b14572a0661f145cab5344a8c9b2076b75b049



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296238300

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1002677936


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    Function<String, String> quoteIdentifierFunction;
+
+    public static Set<Class<?>> supportedDataTypes;
+
+    static {
+        supportedDataTypes = new HashSet<>();
+        supportedDataTypes.add(IntType.class);
+        supportedDataTypes.add(BigIntType.class);
+        supportedDataTypes.add(BooleanType.class);
+        supportedDataTypes.add(DecimalType.class);
+        supportedDataTypes.add(DoubleType.class);
+        supportedDataTypes.add(FloatType.class);
+        supportedDataTypes.add(SmallIntType.class);
+        supportedDataTypes.add(VarCharType.class);
+        supportedDataTypes.add(TimestampType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());

Review Comment:
   Will do 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1208594479

   Hi @libenchao , thanks for the review!
   
   Thanks for pointing out the flaw, you're right. 🙇 
   
   On your recommended approach, I believe `JdbcFilterPushdownVisitor` needs to produce strings or some equivalent data, so that it can be used by `JdbcDynamicTableSource::getScanRuntimeProvider`, how should I make use of PrepareStatement here? Maybe I am missing something?
   
   I think you're pointing out a fundamental issue with this PR, SQL statement generation has to be dialect-specific, and me trying to provide a default implementation might be a lost cause here.
   
   If we cannot go down the prepared statement route, I can think of 2 ideas:
   
   1. Implement dialect-specific method that converts `ValueLiteralExpression` to SQL string literal, and have `JdbcFilterPushdownVisitor` to make use of it. We don't have to support all dialect in 1 go, and we can simply fallback to in-memory filter when its not supported.
   2. Let `JdbcFilterPushdownVisitor` be dialect-specific, then every implementation will need to deal with dialect specific thing including literal stringification. This is similar to current approach, the only difference is that currently this PR provides a default implementation, where this option will stop doing it. My implementation is tested and used in Production for SQL Server. Maybe I can rename it and make it specific to SQL Server jdbc dialect? (SQL Server dialect is still opened #20235), so this will have a dependency to it. Likewise, we can fallback to in-memory filtering when dialect without an implementation.
   
   I think option 1 is less code, but probably more fiddly and easier to break. Option 2 is likely gonna be more code, but the separation is cleaner and less likely to break.
   
   Let me know your thoughts. 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1208843267

   >  I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something?
   
   @qingwei91 Currently `JdbcRowDataInputFormat` already uses `PreparedStatement`, and 'scan.partition' is implemented using it, hence we can do it. `JdbcFilterPushdownVisitor` does not necessarily need to return `String`, it can  return anything we need.
   We may need to rethink about the design, especially for the following points:
   1. What functions should we support, e.g. `IN`, `BETWEEN`?
   2. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1209118740

   > I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR
   
   Not exactly, we have a threshold (default 4): https://github.com/apache/flink/blob/208f08b406a7fd48890cda16d317a30ee892a2e7/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ConvertToNotInOrInRule.scala#L47
   
   > I think we can tackle this incrementally? 
   
   I agree, we can start from some common functions, such as `=`, `<>`, `<`, `>`, `<=`, `>=`, `IS NULL`, `IS NOT NULL`, `IN`, and leave others to the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r925920284


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Hi @JingGe , I believe it has to be PublicEvolving because this class is returned by a public method in JdbcDialect, which is a PublicEvolving class. Here's an example where its failing if I mark it as Internal: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38493&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461
   
   I will sort out the commit, sorry for the confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] grzegorz8 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
grzegorz8 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r985666582


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@PublicEvolving
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    Function<String, String> quoteIdentifierFunction;
+
+    public static Set<Class<?>> supportedDataTypes;
+
+    static {
+        supportedDataTypes = new HashSet<>();
+        supportedDataTypes.add(IntType.class);
+        supportedDataTypes.add(BigIntType.class);
+        supportedDataTypes.add(BooleanType.class);
+        supportedDataTypes.add(DecimalType.class);
+        supportedDataTypes.add(DoubleType.class);
+        supportedDataTypes.add(FloatType.class);
+        supportedDataTypes.add(SmallIntType.class);
+        supportedDataTypes.add(VarCharType.class);
+        supportedDataTypes.add(TimestampType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Supplier<Optional<ParameterizedPredicate>> rightOperandString =
+                () -> allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.get().map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Class<?> typeCs = tpe.getClass();
+
+        if (supportedDataTypes.contains(typeCs)) {
+            ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+
+            Serializable[] params = new Serializable[1];
+
+            if (typeCs.equals(VarCharType.class)) {
+                params[0] = litExp.getValueAs(String.class).orElse(null);
+            }
+            if (typeCs.equals(BigIntType.class)) {
+                params[0] = litExp.getValueAs(Long.class).orElse(null);
+            }
+            if (typeCs.equals(IntType.class) || typeCs.equals(SmallIntType.class)) {
+                params[0] = litExp.getValueAs(Integer.class).orElse(null);
+            }
+            if (typeCs.equals(DoubleType.class)) {
+                params[0] = litExp.getValueAs(Double.class).orElse(null);
+            }
+            if (typeCs.equals(BooleanType.class)) {
+                params[0] = litExp.getValueAs(Boolean.class).orElse(null);
+            }
+            if (typeCs.equals(FloatType.class)) {
+                params[0] = litExp.getValueAs(Float.class).orElse(null);
+            }
+            if (typeCs.equals(DecimalType.class)) {
+                params[0] = litExp.getValueAs(BigDecimal.class).orElse(null);
+            }
+            if (typeCs.equals(DateType.class)) {
+                params[0] = litExp.getValueAs(LocalDate.class).map(Date::valueOf).orElse(null);
+            }
+            if (typeCs.equals(TimestampType.class)) {
+                params[0] =
+                        litExp.getValueAs(LocalDateTime.class).map(Timestamp::valueOf).orElse(null);

Review Comment:
   Timestamps might be tricky. What if one database accepts `yyyy-MM-dd'T'HH:mm:ss.SSSZ` while other one requires `yyyy-MM-dd HH:mm:ss.SSSZ` (note there is no `'T'`)? The format might differ per dialect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: Flink 16024/1.16 jdbc filter

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r912452959


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java:
##########
@@ -142,4 +143,18 @@ Optional<String> getUpsertStatement(
      */
     String getSelectFromStatement(
             String tableName, String[] selectFields, String[] conditionFields);
+
+    /**
+     * Get FilterPushdownVisitor that can convert Flink SQL Filter Expression into corresponding SQL
+     * dialect Filter Expression. The resulting string can then be pushdown to SQL data source to
+     * optimize the query.
+     *
+     * <p>You can customize the rendering for your dialect by overriding this method, and extends
+     * from {@link JdbcFilterPushdownVisitor}
+     *
+     * @return {@link JdbcFilterPushdownVisitor}
+     */
+    default JdbcFilterPushdownVisitor getFilterPushdownVisitor() {

Review Comment:
   This should be dialect specific so that it can be specialized to specific SQL syntax.
   
   I noticed we removed all default implementation from this interface, should I do the same for this method?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -31,28 +31,41 @@
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** A {@link DynamicTableSource} for JDBC. */
 @Internal
 public class JdbcDynamicTableSource
         implements ScanTableSource,
                 LookupTableSource,
                 SupportsProjectionPushDown,
-                SupportsLimitPushDown {
+                SupportsLimitPushDown,
+                SupportsFilterPushDown {

Review Comment:
   Support Filter Pushdown in JDBC source



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -171,17 +205,60 @@ public boolean equals(Object o) {
                 && Objects.equals(lookupOptions, that.lookupOptions)
                 && Objects.equals(physicalRowDataType, that.physicalRowDataType)
                 && Objects.equals(dialectName, that.dialectName)
-                && Objects.equals(limit, that.limit);
+                && Objects.equals(limit, that.limit)
+                && Objects.equals(resolvedPredicates, that.resolvedPredicates);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
-                options, readOptions, lookupOptions, physicalRowDataType, dialectName, limit);
+                options,
+                readOptions,
+                lookupOptions,
+                physicalRowDataType,
+                dialectName,
+                limit,
+                resolvedPredicates);
     }
 
     @Override
     public void applyLimit(long limit) {
         this.limit = limit;
     }
+
+    /**
+     * This method makes use of {@link JdbcFilterPushdownVisitor} to generate dialect-specific SQL
+     * expression. The visitor returns Optional.empty() for filter that it cannot handle, which will
+     * then be handled in Flink runtime.
+     */
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {

Review Comment:
   Core implementation of this change:
   
   We traverse the ResolvedExpression, and produce a String if we know how to push it to SQL Database, returning None if we cannot handle it.
   
   The unhandled expression will be kept as Flink SQL and run in the job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20140: Flink 16024/1.16 jdbc filter

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1174938568

   @qingwei91 Thanks for the PR. Can you make sure that your PR title and especially your commit messages are confirming to the Code Contribution guide? https://flink.apache.org/contributing/contribute-code.html
   
   For example, your commit message should start with `[FLINK-16024][Connector][JDBC]` in this case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1295483454

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296273034

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008821587


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    private static final Set<Class<?>> SUPPORTED_DATA_TYPES;
+
+    static {
+        SUPPORTED_DATA_TYPES = new HashSet<>();
+        SUPPORTED_DATA_TYPES.add(IntType.class);
+        SUPPORTED_DATA_TYPES.add(BigIntType.class);
+        SUPPORTED_DATA_TYPES.add(BooleanType.class);
+        SUPPORTED_DATA_TYPES.add(DecimalType.class);
+        SUPPORTED_DATA_TYPES.add(DoubleType.class);
+        SUPPORTED_DATA_TYPES.add(FloatType.class);
+        SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+        SUPPORTED_DATA_TYPES.add(VarCharType.class);
+        SUPPORTED_DATA_TYPES.add(TimestampType.class);
+        SUPPORTED_DATA_TYPES.add(DateType.class);
+        SUPPORTED_DATA_TYPES.add(TimeType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Class<?> typeCs = tpe.getClass();
+
+        if (SUPPORTED_DATA_TYPES.contains(typeCs)) {

Review Comment:
   How about change this to something like:
   ```java
   switch(tpe.getTypeRoot()) {
     case INTEGER: ...
     case VARCHAR: ...
     ...
     default:
       return Optional.empty();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1011111371


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -93,39 +99,74 @@ private Optional<ParameterizedPredicate> renderBinaryOperator(
                 left -> rightOperandString.map(right -> left.combine(operator, right)));
     }
 
+    private Optional<ParameterizedPredicate> renderUnaryOperator(
+            String operator, ResolvedExpression operand, Boolean operandOnLeft) {

Review Comment:
   ```suggestion
               String operator, ResolvedExpression operand, boolean operandOnLeft) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1002679105


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java:
##########
@@ -264,6 +264,146 @@ void testLimit() throws Exception {
                 .containsAll(result);
     }
 
+    @Test
+    public void testFilter() throws Exception {

Review Comment:
   Sure, will do



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorITCase.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.derby.DerbyDialectFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link JdbcFilterPushdownPreparedStatementVisitor}. */
+public class JdbcFilterPushdownPreparedStatementVisitorITCase extends AbstractTestBase {

Review Comment:
   My bad, it is a mistake



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1272509188

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r924891272


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Hi @JingGe did you get a chance to check on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
JingGe commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r924934341


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Hi @qingwei91 Could you please explain why is a public API required? Does @Internal work too? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1003920728


##########
flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml:
##########
@@ -51,4 +51,22 @@ TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decima
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFilterPushdown">
+	<Resource name="sql">
+		<![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> '11:11:11.000111' OR double_col >= -1000.23]]>
+	</Resource>
+	<Resource name="ast">
+		<![CDATA[
+LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, _UTF-16LE'11:11:11.000111')), >=($5, -1000.23:DECIMAL(6, 2)))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+	</Resource>
+	<Resource name="optimized exec plan">
+		<![CDATA[
+Calc(select=[id, time_col, real_col], where=[((time_col <> '11:11:11.000111') OR (double_col >= -1000.23))])

Review Comment:
   why the condition still exists in the Calc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1207632475

   @qingwei91 Thanks for your contribution. For the current design, I see you use `ValueLiteralExpression#toString()` to generate the string for literals. This may work for some cases, but not for all cases IMHO.
   Consider following cases:
   1. `ValueLiteralExpression#toString()` uses Flink dialect to represent string, and it hard-coded the quote with `'`. However, in many DBMS, `'` is not the only choice. 
   2. `ValueLiteralExpression#toString()` only handles special character `'` using escape. In many DBMS, they need more special character handing, e.g. [mysql driver](https://github.com/mysql/mysql-connector-j/blob/release/8.0/src/main/protocol-impl/java/com/mysql/cj/protocol/a/StringValueEncoder.java#L72)
   3. For other types, e.g. `TIMESTAMP`, `DATE`, `TIME`, `INTERVAL` and so on, they may suffers from this too, because we cannot assume that all DB dialects handle them in the same way.
   
   Another more general way to handle this is to use `PrepareStatement.setXXX` just like we already did in `TableSimpleStatementExecutor` and `JdbcRowDataLookupFunction`. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1288189339

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296359376

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1009034445


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Serializable[] params = new Serializable[1];
+
+        ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+        switch (tpe.getTypeRoot()) {

Review Comment:
   1. could you sort out the case branches, similar to `LogicalTypeRoot`
   2. there are still some left types, such as `CHAR`, `SMALLINT`, `TINYINT`, I would suggest that we add these types as much as possible, unless we cannot for now.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Serializable[] params = new Serializable[1];
+
+        ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+        switch (tpe.getTypeRoot()) {
+            case VARCHAR:
+                params[0] = litExp.getValueAs(String.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case BIGINT:
+                params[0] = litExp.getValueAs(Long.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case INTEGER:
+                params[0] = litExp.getValueAs(Integer.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case DOUBLE:
+                params[0] = litExp.getValueAs(Double.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case BOOLEAN:
+                params[0] = litExp.getValueAs(Boolean.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case FLOAT:
+                params[0] = litExp.getValueAs(Float.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case DECIMAL:
+                params[0] = litExp.getValueAs(BigDecimal.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case DATE:
+                params[0] = litExp.getValueAs(LocalDate.class).map(Date::valueOf).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case TIME_WITHOUT_TIME_ZONE:
+                params[0] = litExp.getValueAs(java.sql.Time.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                params[0] =
+                        litExp.getValueAs(LocalDateTime.class).map(Timestamp::valueOf).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            default:
+                return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(FieldReferenceExpression fieldReference) {
+        String predicateStr = (this.quoteIdentifierFunction.apply(fieldReference.toString()));

Review Comment:
   useless `(` `)`



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;

Review Comment:
   `final`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private Function<String, String> quoteIdentifierFunction;
+
+    private static final Set<Class<?>> SUPPORTED_DATA_TYPES;
+
+    static {
+        SUPPORTED_DATA_TYPES = new HashSet<>();
+        SUPPORTED_DATA_TYPES.add(IntType.class);
+        SUPPORTED_DATA_TYPES.add(BigIntType.class);
+        SUPPORTED_DATA_TYPES.add(BooleanType.class);
+        SUPPORTED_DATA_TYPES.add(DecimalType.class);
+        SUPPORTED_DATA_TYPES.add(DoubleType.class);
+        SUPPORTED_DATA_TYPES.add(FloatType.class);
+        SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+        SUPPORTED_DATA_TYPES.add(VarCharType.class);
+        SUPPORTED_DATA_TYPES.add(TimestampType.class);
+        SUPPORTED_DATA_TYPES.add(DateType.class);
+        SUPPORTED_DATA_TYPES.add(TimeType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Class<?> typeCs = tpe.getClass();
+
+        if (SUPPORTED_DATA_TYPES.contains(typeCs)) {

Review Comment:
   Great idea! I didnt discover this typeRoot thingy earlier, thanks for the suggestion.
   
   https://github.com/apache/flink/pull/20140/commits/dd54aedb1a71eb66fc37857c86a0c8e32ff8e4b6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao closed pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao closed pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
URL: https://github.com/apache/flink/pull/20140


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1183524474

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r922363732


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Hi @JingGe, thanks for the comment.
   
   This is a file I newly added, and in my latest commit, I marked it as PublicEvolving: https://github.com/apache/flink/pull/20140/commits/6c97da0eb0d27774ed9954fbabed7b514ac8ce02
   
   Do you mean we cannot expose new API? 
   For instance, I added this here: https://github.com/apache/flink/pull/20140/files#diff-ae60653ffe2ac890a3c1b01da41405bcc4e6913949176c36edc009df5090c38fR157, which adds a new method and a new return type to `JdbcDialect` which is PublicEvolving, because filter pushdown might differ across JDBC dialect. Is this approach a problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
JingGe commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r921863300


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownVisitor.java:
##########
@@ -53,7 +52,6 @@
  * Visitor to walk a Expression AST. Produces a String that can be used to pushdown the filter,
  * return Optional.empty() if we cannot pushdown the filter.
  */
-@Public

Review Comment:
   Changing public interface is not allowed. Please check [FLIP-196: Source API stability guarantees](https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%3A+Source+API+stability+guarantees) for more information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HeChuanXUPT commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
HeChuanXUPT commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1237699265

   Wow, Thanks, It works for me~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1275500147

   @qingwei91 Thanks for the updating, I'll review this PR in a few days. 
   
   > I didn't implement additional operators like IN and BETWEEN because I am quite busy of late. Given they can be implemented later incrementally, and its not going to break anything, I wonder if we can defer those to a different ticket?
   
   I'm ok with this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by GitBox <gi...@apache.org>.
libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1002654132


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    Function<String, String> quoteIdentifierFunction;
+
+    public static Set<Class<?>> supportedDataTypes;

Review Comment:
   These two fields can both be `private`?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -159,8 +206,12 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
 
     @Override
     public DynamicTableSource copy() {
-        return new JdbcDynamicTableSource(
-                options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
+        JdbcDynamicTableSource newSource =
+                new JdbcDynamicTableSource(
+                        options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
+        newSource.resolvedPredicates = this.resolvedPredicates;
+        newSource.pushdownParams = this.pushdownParams;

Review Comment:
   Make a deep copy for these two params cause they are mutable.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -117,21 +137,48 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
                         options.getTableName(),
                         DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
                         new String[0]);
+        final List<String> predicates = new ArrayList<String>();
+
         if (readOptions.getPartitionColumnName().isPresent()) {
             long lowerBound = readOptions.getPartitionLowerBound().get();
             long upperBound = readOptions.getPartitionUpperBound().get();
             int numPartitions = readOptions.getNumPartitions().get();
+
+            Serializable[][] allPushdownParams = replicatePushdownParamsForN(numPartitions);
+            JdbcParameterValuesProvider allParams =
+                    new CompositeJdbcParameterValuesProvider(
+                            new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
+                                    .ofBatchNum(numPartitions),
+                            new JdbcGenericParameterValuesProvider(allPushdownParams));
+
+            builder.setParametersProvider(allParams);
+
+            predicates.add(
+                    dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+                            + " BETWEEN ? AND ?");
+        } else {
             builder.setParametersProvider(
-                    new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
-                            .ofBatchNum(numPartitions));
-            query +=
-                    " WHERE "
-                            + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
-                            + " BETWEEN ? AND ?";
+                    new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
         }
+
+        predicates.addAll(this.resolvedPredicates);
+
+        if (predicates.size() > 0) {
+            String joinedConditions =
+                    predicates.stream()
+                            .map(pred -> String.format("(%s)", pred))
+                            .collect(Collectors.joining(" AND "));
+            query += " WHERE " + joinedConditions;
+        }
+
         if (limit >= 0) {
             query = String.format("%s %s", query, dialect.getLimitClause(limit));
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Query generated for JDBC scan: " + query);
+        }

Review Comment:
   It's not necessary to do `if (log.isDebugEnabled())`?



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java:
##########
@@ -264,6 +264,146 @@ void testLimit() throws Exception {
                 .containsAll(result);
     }
 
+    @Test
+    public void testFilter() throws Exception {

Review Comment:
   Could you also add a test in `JdbcTablePlanTest` like `JdbcTablePlanTest#testLimitPushDown`?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -22,33 +22,51 @@
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
+import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
 import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** A {@link DynamicTableSource} for JDBC. */
 @Internal
 public class JdbcDynamicTableSource
         implements ScanTableSource,
                 LookupTableSource,
                 SupportsProjectionPushDown,
-                SupportsLimitPushDown {
+                SupportsLimitPushDown,
+                SupportsFilterPushDown {
+    private static Logger log = LoggerFactory.getLogger(JdbcDynamicTableSource.class);

Review Comment:
   `log` -> `LOG`



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorITCase.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.derby.DerbyDialectFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link JdbcFilterPushdownPreparedStatementVisitor}. */
+public class JdbcFilterPushdownPreparedStatementVisitorITCase extends AbstractTestBase {

Review Comment:
   Why do you name this an `ITCase` instead of normal UT?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    Function<String, String> quoteIdentifierFunction;
+
+    public static Set<Class<?>> supportedDataTypes;

Review Comment:
   And we usually use upper case for static field names, also make fields `final` as much as possible.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    Function<String, String> quoteIdentifierFunction;
+
+    public static Set<Class<?>> supportedDataTypes;
+
+    static {
+        supportedDataTypes = new HashSet<>();
+        supportedDataTypes.add(IntType.class);
+        supportedDataTypes.add(BigIntType.class);
+        supportedDataTypes.add(BooleanType.class);
+        supportedDataTypes.add(DecimalType.class);
+        supportedDataTypes.add(DoubleType.class);
+        supportedDataTypes.add(FloatType.class);
+        supportedDataTypes.add(SmallIntType.class);
+        supportedDataTypes.add(VarCharType.class);
+        supportedDataTypes.add(TimestampType.class);
+    }
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());

Review Comment:
   How about adding `<>` too since it's also a simple binary operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

Posted by "qingwei91 (via GitHub)" <gi...@apache.org>.
qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1114915489


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private final Function<String, String> quoteIdentifierFunction;
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {

Review Comment:
   Hi @grzegorz8 , I think we simply missed it, thanks for noticing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org