You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/02 11:19:56 UTC

[flink] branch master updated: [FLINK-16024][connector/jdbc] Support filter pushdown

This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 496700197b9 [FLINK-16024][connector/jdbc] Support filter pushdown
496700197b9 is described below

commit 496700197b9e2ca60f6cdd3eb01fa4a12227548a
Author: Qing Lim <q....@mwam.com>
AuthorDate: Wed Jun 22 12:55:20 2022 +0100

    [FLINK-16024][connector/jdbc] Support filter pushdown
    
    This closes #20140
---
 .../CompositeJdbcParameterValuesProvider.java      |  58 ++++
 .../jdbc/table/JdbcDynamicTableSource.java         | 113 +++++++-
 ...JdbcFilterPushdownPreparedStatementVisitor.java | 201 ++++++++++++++
 .../jdbc/table/ParameterizedPredicate.java         |  59 ++++
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   | 140 ++++++++++
 ...FilterPushdownPreparedStatementVisitorTest.java | 297 +++++++++++++++++++++
 .../connector/jdbc/table/JdbcTablePlanTest.java    |   6 +
 .../connector/jdbc/table/JdbcTablePlanTest.xml     |  17 ++
 8 files changed, 880 insertions(+), 11 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
new file mode 100644
index 00000000000..eebeac5abe0
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/** Combine 2 {@link JdbcParameterValuesProvider} into 1. */
+@Internal
+public class CompositeJdbcParameterValuesProvider implements JdbcParameterValuesProvider {
+    JdbcParameterValuesProvider a;
+    JdbcParameterValuesProvider b;
+
+    public CompositeJdbcParameterValuesProvider(
+            JdbcParameterValuesProvider a, JdbcParameterValuesProvider b) {
+        Preconditions.checkArgument(
+                a.getParameterValues().length == b.getParameterValues().length,
+                "Both JdbcParameterValuesProvider should have the same length.");
+        this.a = a;
+        this.b = b;
+    }
+
+    @Override
+    public Serializable[][] getParameterValues() {
+        int batchNum = this.a.getParameterValues().length;
+        Serializable[][] parameters = new Serializable[batchNum][];
+        for (int i = 0; i < batchNum; i++) {
+            Serializable[] aSlice = a.getParameterValues()[i];
+            Serializable[] bSlice = b.getParameterValues()[i];
+            int totalLen = aSlice.length + bSlice.length;
+
+            Serializable[] batchParams = new Serializable[totalLen];
+
+            System.arraycopy(aSlice, 0, batchParams, 0, aSlice.length);
+            System.arraycopy(bSlice, 0, batchParams, aSlice.length, bSlice.length);
+            parameters[i] = batchParams;
+        }
+        return parameters;
+    }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 0bc7ae04792..e27cf63df50 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -22,25 +22,41 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
+import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
 import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** A {@link DynamicTableSource} for JDBC. */
 @Internal
@@ -48,7 +64,9 @@ public class JdbcDynamicTableSource
         implements ScanTableSource,
                 LookupTableSource,
                 SupportsProjectionPushDown,
-                SupportsLimitPushDown {
+                SupportsLimitPushDown,
+                SupportsFilterPushDown {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class);
 
     private final JdbcConnectorOptions options;
     private final JdbcReadOptions readOptions;
@@ -57,6 +75,8 @@ public class JdbcDynamicTableSource
     private DataType physicalRowDataType;
     private final String dialectName;
     private long limit = -1;
+    private List<String> resolvedPredicates = new ArrayList<>();
+    private Serializable[] pushdownParams = new Serializable[0];
 
     public JdbcDynamicTableSource(
             JdbcConnectorOptions options,
@@ -117,21 +137,46 @@ public class JdbcDynamicTableSource
                         options.getTableName(),
                         DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
                         new String[0]);
+        final List<String> predicates = new ArrayList<String>();
+
         if (readOptions.getPartitionColumnName().isPresent()) {
             long lowerBound = readOptions.getPartitionLowerBound().get();
             long upperBound = readOptions.getPartitionUpperBound().get();
             int numPartitions = readOptions.getNumPartitions().get();
+
+            Serializable[][] allPushdownParams = replicatePushdownParamsForN(numPartitions);
+            JdbcParameterValuesProvider allParams =
+                    new CompositeJdbcParameterValuesProvider(
+                            new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
+                                    .ofBatchNum(numPartitions),
+                            new JdbcGenericParameterValuesProvider(allPushdownParams));
+
+            builder.setParametersProvider(allParams);
+
+            predicates.add(
+                    dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+                            + " BETWEEN ? AND ?");
+        } else {
             builder.setParametersProvider(
-                    new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
-                            .ofBatchNum(numPartitions));
-            query +=
-                    " WHERE "
-                            + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
-                            + " BETWEEN ? AND ?";
+                    new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
         }
+
+        predicates.addAll(this.resolvedPredicates);
+
+        if (predicates.size() > 0) {
+            String joinedConditions =
+                    predicates.stream()
+                            .map(pred -> String.format("(%s)", pred))
+                            .collect(Collectors.joining(" AND "));
+            query += " WHERE " + joinedConditions;
+        }
+
         if (limit >= 0) {
             query = String.format("%s %s", query, dialect.getLimitClause(limit));
         }
+
+        LOG.debug("Query generated for JDBC scan: " + query);
+
         builder.setQuery(query);
         final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
         builder.setRowConverter(dialect.getRowConverter(rowType));
@@ -159,8 +204,12 @@ public class JdbcDynamicTableSource
 
     @Override
     public DynamicTableSource copy() {
-        return new JdbcDynamicTableSource(
-                options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
+        JdbcDynamicTableSource newSource =
+                new JdbcDynamicTableSource(
+                        options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
+        newSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
+        newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, this.pushdownParams.length);
+        return newSource;
     }
 
     @Override
@@ -183,7 +232,9 @@ public class JdbcDynamicTableSource
                 && Objects.equals(cache, that.cache)
                 && Objects.equals(physicalRowDataType, that.physicalRowDataType)
                 && Objects.equals(dialectName, that.dialectName)
-                && Objects.equals(limit, that.limit);
+                && Objects.equals(limit, that.limit)
+                && Objects.equals(resolvedPredicates, that.resolvedPredicates)
+                && Arrays.deepEquals(pushdownParams, that.pushdownParams);
     }
 
     @Override
@@ -195,11 +246,51 @@ public class JdbcDynamicTableSource
                 cache,
                 physicalRowDataType,
                 dialectName,
-                limit);
+                limit,
+                resolvedPredicates,
+                pushdownParams);
     }
 
     @Override
     public void applyLimit(long limit) {
         this.limit = limit;
     }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+        List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+        for (ResolvedExpression filter : filters) {
+            Optional<ParameterizedPredicate> simplePredicate = parseFilterToPredicate(filter);
+            if (simplePredicate.isPresent()) {
+                acceptedFilters.add(filter);
+                ParameterizedPredicate pred = simplePredicate.get();
+                this.pushdownParams = ArrayUtils.addAll(this.pushdownParams, pred.getParameters());
+                this.resolvedPredicates.add(pred.getPredicate());
+            } else {
+                remainingFilters.add(filter);
+            }
+        }
+
+        return Result.of(acceptedFilters, remainingFilters);
+    }
+
+    private Optional<ParameterizedPredicate> parseFilterToPredicate(ResolvedExpression filter) {
+        if (filter instanceof CallExpression) {
+            CallExpression callExp = (CallExpression) filter;
+            return callExp.accept(
+                    new JdbcFilterPushdownPreparedStatementVisitor(
+                            this.options.getDialect()::quoteIdentifier));
+        }
+        return Optional.empty();
+    }
+
+    private Serializable[][] replicatePushdownParamsForN(int n) {
+        Serializable[][] allPushdownParams = new Serializable[n][pushdownParams.length];
+        for (int i = 0; i < n; i++) {
+            allPushdownParams[i] = this.pushdownParams;
+        }
+        return allPushdownParams;
+    }
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
new file mode 100644
index 00000000000..f9622a8a37d
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+        extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+    private final Function<String, String> quoteIdentifierFunction;
+
+    public JdbcFilterPushdownPreparedStatementVisitor(
+            Function<String, String> quoteIdentifierFunction) {
+        this.quoteIdentifierFunction = quoteIdentifierFunction;
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return renderBinaryOperator("AND", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+            return renderUnaryOperator("IS NULL", call.getResolvedChildren().get(0), true);
+        }
+        if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+            return renderUnaryOperator("IS NOT NULL", call.getResolvedChildren().get(0), true);
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ParameterizedPredicate> renderBinaryOperator(
+            String operator, List<ResolvedExpression> allOperands) {
+        Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+        Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+        return leftOperandString.flatMap(
+                left -> rightOperandString.map(right -> left.combine(operator, right)));
+    }
+
+    private Optional<ParameterizedPredicate> renderUnaryOperator(
+            String operator, ResolvedExpression operand, boolean operandOnLeft) {
+        if (operand instanceof FieldReferenceExpression) {
+            Optional<ParameterizedPredicate> fieldPartialPredicate =
+                    this.visit((FieldReferenceExpression) operand);
+            if (operandOnLeft) {
+                return fieldPartialPredicate.map(
+                        fieldPred ->
+                                new ParameterizedPredicate(
+                                        String.format(
+                                                "(%s %s)", fieldPred.getPredicate(), operator)));
+            } else {
+                return fieldPartialPredicate.map(
+                        fieldPred ->
+                                new ParameterizedPredicate(
+                                        String.format(
+                                                "(%s %s)", operator, fieldPred.getPredicate())));
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+        LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+        Serializable[] params = new Serializable[1];
+
+        ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+        switch (tpe.getTypeRoot()) {
+            case CHAR:
+                params[0] = litExp.getValueAs(String.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case VARCHAR:
+                params[0] = litExp.getValueAs(String.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case BOOLEAN:
+                params[0] = litExp.getValueAs(Boolean.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case DECIMAL:
+                params[0] = litExp.getValueAs(BigDecimal.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case TINYINT:
+                params[0] = litExp.getValueAs(Byte.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case SMALLINT:
+                params[0] = litExp.getValueAs(Short.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case INTEGER:
+                params[0] = litExp.getValueAs(Integer.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case BIGINT:
+                params[0] = litExp.getValueAs(Long.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case FLOAT:
+                params[0] = litExp.getValueAs(Float.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case DOUBLE:
+                params[0] = litExp.getValueAs(Double.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case DATE:
+                params[0] = litExp.getValueAs(LocalDate.class).map(Date::valueOf).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case TIME_WITHOUT_TIME_ZONE:
+                params[0] = litExp.getValueAs(java.sql.Time.class).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                params[0] =
+                        litExp.getValueAs(LocalDateTime.class).map(Timestamp::valueOf).orElse(null);
+                predicate.setParameters(params);
+                return Optional.of(predicate);
+            default:
+                return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<ParameterizedPredicate> visit(FieldReferenceExpression fieldReference) {
+        String predicateStr = this.quoteIdentifierFunction.apply(fieldReference.toString());
+        ParameterizedPredicate predicate = new ParameterizedPredicate(predicateStr);
+        return Optional.of(predicate);
+    }
+
+    @Override
+    protected Optional<ParameterizedPredicate> defaultMethod(Expression expression) {
+        return Optional.empty();
+    }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
new file mode 100644
index 00000000000..82a9a856c76
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+
+/** A data class that model parameterized sql predicate. */
+@Experimental
+public class ParameterizedPredicate {
+    private String predicate;
+    private Serializable[] parameters;
+
+    public ParameterizedPredicate(String predicate) {
+        this.predicate = predicate;
+        this.parameters = new Serializable[0];
+    }
+
+    public Serializable[] getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Serializable[] parameters) {
+        this.parameters = parameters;
+    }
+
+    public String getPredicate() {
+        return predicate;
+    }
+
+    public void setPredicate(String predicate) {
+        this.predicate = predicate;
+    }
+
+    public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) {
+        this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate);
+        this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
+        return this;
+    }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index c982e4c1a6c..d23c5d194cd 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -264,6 +264,146 @@ public class JdbcDynamicTableSourceITCase {
                 .containsAll(result);
     }
 
+    @Test
+    public void testFilter() throws Exception {
+        String partitionedTable = "PARTITIONED_TABLE";
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id BIGINT,"
+                        + "timestamp6_col TIMESTAMP(6),"
+                        + "timestamp9_col TIMESTAMP(9),"
+                        + "time_col TIME,"
+                        + "real_col FLOAT,"
+                        + "double_col DOUBLE,"
+                        + "decimal_col DECIMAL(10, 4)"
+                        + ") WITH ("
+                        + "  'connector'='jdbc',"
+                        + "  'url'='"
+                        + DB_URL
+                        + "',"
+                        + "  'table-name'='"
+                        + INPUT_TABLE
+                        + "'"
+                        + ")");
+
+        // create a partitioned table to ensure no regression
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + partitionedTable
+                        + "("
+                        + "id BIGINT,"
+                        + "timestamp6_col TIMESTAMP(6),"
+                        + "timestamp9_col TIMESTAMP(9),"
+                        + "time_col TIME,"
+                        + "real_col FLOAT,"
+                        + "double_col DOUBLE,"
+                        + "decimal_col DECIMAL(10, 4)"
+                        + ") WITH ("
+                        + "  'connector'='jdbc',"
+                        + "  'url'='"
+                        + DB_URL
+                        + "',"
+                        + "  'table-name'='"
+                        + INPUT_TABLE
+                        + "',"
+                        + "  'scan.partition.column'='id',\n"
+                        + "  'scan.partition.num'='1',\n"
+                        + "  'scan.partition.lower-bound'='1',\n"
+                        + "  'scan.partition.upper-bound'='1'\n"
+                        + ")");
+
+        // we create a VIEW here to test column remapping, ie. would filter push down work if we
+        // create a view that depends on our source table
+        tEnv.executeSql(
+                String.format(
+                        "CREATE VIEW FAKE_TABLE ("
+                                + "idx, timestamp6_col, timestamp9_col, time_col, real_col, double_col, decimal_col"
+                                + ") as (SELECT * from %s )",
+                        INPUT_TABLE));
+
+        List<String> onlyRow1 =
+                Stream.of(
+                                "+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]")
+                        .collect(Collectors.toList());
+
+        List<String> twoRows =
+                Stream.of(
+                                "+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]",
+                                "+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]")
+                        .collect(Collectors.toList());
+
+        List<String> onlyRow2 =
+                Stream.of(
+                                "+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]")
+                        .collect(Collectors.toList());
+        List<String> noRows = new ArrayList<>();
+
+        // test simple filter
+        assertQueryReturns("SELECT * FROM FAKE_TABLE WHERE idx = 1", onlyRow1);
+        // test TIMESTAMP filter
+        assertQueryReturns(
+                "SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'",
+                onlyRow1);
+        // test the IN operator
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + "FAKE_TABLE"
+                        + " WHERE 1 = idx AND decimal_col IN (100.1234, 101.1234)",
+                onlyRow1);
+        // test mixing AND and OR operator
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + "FAKE_TABLE"
+                        + " WHERE idx = 1 AND decimal_col = 100.1234 OR decimal_col = 101.1234",
+                twoRows);
+        // test mixing AND/OR with parenthesis, and the swapping the operand of equal expression
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + "FAKE_TABLE"
+                        + " WHERE (2 = idx AND decimal_col = 100.1234) OR decimal_col = 101.1234",
+                onlyRow2);
+
+        // test Greater than, just to make sure we didnt break anything that we cannot pushdown
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + "FAKE_TABLE"
+                        + " WHERE idx = 2 AND decimal_col > 100 OR decimal_col = 101.123",
+                onlyRow2);
+
+        // One more test of parenthesis
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + "FAKE_TABLE"
+                        + " WHERE 2 = idx AND (decimal_col = 100.1234 OR real_col = 101.1234)",
+                noRows);
+
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + partitionedTable
+                        + " WHERE id = 2 AND decimal_col > 100 OR decimal_col = 101.123",
+                noRows);
+
+        assertQueryReturns(
+                "SELECT * FROM "
+                        + partitionedTable
+                        + " WHERE 1 = id AND decimal_col IN (100.1234, 101.1234)",
+                onlyRow1);
+    }
+
+    private List<String> rowIterToList(Iterator<Row> rows) {
+        return CollectionUtil.iteratorToList(rows).stream()
+                .map(Row::toString)
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    private void assertQueryReturns(String query, List<String> expected) {
+        List<String> actual = rowIterToList(tEnv.executeSql(query).collect());
+        assertThat(actual).isEqualTo(expected);
+    }
+
     @ParameterizedTest
     @EnumSource(Caching.class)
     void testLookupJoin(Caching caching) throws Exception {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
new file mode 100644
index 00000000000..e7eb8a43a66
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.derby.DerbyDialectFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link JdbcFilterPushdownPreparedStatementVisitor}. */
+public class JdbcFilterPushdownPreparedStatementVisitorTest {
+
+    private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
+    public static final String DB_URL = "jdbc:derby:memory:test";
+    public static final String INPUT_TABLE = "jdbDynamicTableSource";
+
+    public static StreamExecutionEnvironment env;
+    public static TableEnvironment tEnv;
+
+    @Before
+    public void before() throws ClassNotFoundException, SQLException {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+
+        System.setProperty(
+                "derby.stream.error.field", JdbcTestBase.class.getCanonicalName() + ".DEV_NULL");
+        Class.forName(DRIVER_CLASS);
+
+        try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+                Statement statement = conn.createStatement()) {
+            statement.executeUpdate(
+                    "CREATE TABLE "
+                            + INPUT_TABLE
+                            + " ("
+                            + "id BIGINT NOT NULL,"
+                            + "description VARCHAR(200) NOT NULL,"
+                            + "timestamp6_col TIMESTAMP, "
+                            + "timestamp9_col TIMESTAMP, "
+                            + "time_col TIME, "
+                            + "real_col FLOAT(23), "
+                            + // A precision of 23 or less makes FLOAT equivalent to REAL.
+                            "double_col FLOAT(24),"
+                            + // A precision of 24 or greater makes FLOAT equivalent to DOUBLE
+                            // PRECISION.
+                            "decimal_col DECIMAL(10, 4))");
+        }
+        // Create table in Flink, this can be reused across test cases
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id BIGINT,"
+                        + "description VARCHAR(200),"
+                        + "timestamp6_col TIMESTAMP(6),"
+                        + "timestamp9_col TIMESTAMP(9),"
+                        + "time_col TIME,"
+                        + "real_col FLOAT,"
+                        + "double_col DOUBLE,"
+                        + "decimal_col DECIMAL(10, 4)"
+                        + ") WITH ("
+                        + "  'connector'='jdbc',"
+                        + "  'url'='"
+                        + DB_URL
+                        + "',"
+                        + "  'table-name'='"
+                        + INPUT_TABLE
+                        + "'"
+                        + ")");
+    }
+
+    @After
+    public void clearOutputTable() throws Exception {
+        Class.forName(DRIVER_CLASS);
+        try (Connection conn = DriverManager.getConnection(DB_URL);
+                Statement stat = conn.createStatement()) {
+            stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
+        }
+        StreamTestSink.clear();
+    }
+
+    @Test
+    public void testSimpleExpressionPrimitiveType() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+        Arrays.asList(
+                        new Object[] {"id = 6", "id = ?", 6L},
+                        new Object[] {"id >= 6", "id >= ?", 6},
+                        new Object[] {"id > 6", "id > ?", 6},
+                        new Object[] {"id < 6", "id < ?", 6},
+                        new Object[] {"id <= 5", "id <= ?", 5},
+                        new Object[] {"description = 'Halo'", "description = ?", "Halo"},
+                        new Object[] {"real_col > 0.5", "real_col > ?", new BigDecimal("0.5")},
+                        new Object[] {
+                            "double_col <= -0.3", "double_col <= ?", new BigDecimal("-0.3")
+                        })
+                .forEach(
+                        inputs ->
+                                assertSimpleInputExprEqualsOutExpr(
+                                        (String) inputs[0],
+                                        schema,
+                                        (String) inputs[1],
+                                        (Serializable) inputs[2]));
+    }
+
+    @Test
+    public void testComplexExpressionDatetime() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+        String andExpr = "id = 6 AND timestamp6_col = TIMESTAMP '2022-01-01 07:00:01.333'";
+        Serializable[] expectedParams1 = {6L, Timestamp.valueOf("2022-01-01 07:00:01.333000")};
+        assertGeneratedSQLString(
+                andExpr, schema, "((id = ?) AND (timestamp6_col = ?))", expectedParams1);
+
+        Serializable[] expectedParams2 = {Timestamp.valueOf("2022-01-01 07:00:01.333"), "Halo"};
+        String orExpr =
+                "timestamp9_col = TIMESTAMP '2022-01-01 07:00:01.333' OR description = 'Halo'";
+        assertGeneratedSQLString(
+                orExpr, schema, "((timestamp9_col = ?) OR (description = ?))", expectedParams2);
+    }
+
+    @Test
+    public void testExpressionWithNull() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+        String andExpr = "id = NULL AND real_col <= 0.6";
+
+        Serializable[] expectedParams1 = {null, new BigDecimal("0.6")};
+        assertGeneratedSQLString(
+                andExpr, schema, "((id = ?) AND (real_col <= ?))", expectedParams1);
+
+        Serializable[] expectedParams2 = {6L, null};
+        String orExpr = "id = 6 OR description = NULL";
+        assertGeneratedSQLString(
+                orExpr, schema, "((id = ?) OR (description = ?))", expectedParams2);
+    }
+
+    @Test
+    public void testExpressionIsNull() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+        String andExpr = "id IS NULL AND real_col <= 0.6";
+
+        Serializable[] expectedParams1 = {new BigDecimal("0.6")};
+        assertGeneratedSQLString(
+                andExpr, schema, "((id IS NULL) AND (real_col <= ?))", expectedParams1);
+
+        Serializable[] expectedParams2 = {6L};
+        String orExpr = "id = 6 OR description IS NOT NULL";
+        assertGeneratedSQLString(
+                orExpr, schema, "((id = ?) OR (description IS NOT NULL))", expectedParams2);
+    }
+
+    @Test
+    public void testComplexExpressionPrimitiveType() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+        String andExpr = "id = NULL AND real_col <= 0.6";
+        Serializable[] expectedParams1 = {null, new BigDecimal("0.6")};
+        assertGeneratedSQLString(
+                andExpr, schema, "((id = ?) AND (real_col <= ?))", expectedParams1);
+
+        String orExpr = "id = 6 OR description = NULL";
+        Serializable[] expectedParams2 = {6L, null};
+        assertGeneratedSQLString(
+                orExpr, schema, "((id = ?) OR (description = ?))", expectedParams2);
+    }
+
+    private void assertGeneratedSQLString(
+            String inputExpr,
+            ResolvedSchema schema,
+            String expectedOutputExpr,
+            Serializable[] expectedParams) {
+        List<ResolvedExpression> resolved = resolveSQLFilterToExpression(inputExpr, schema);
+        assertEquals(1, resolved.size());
+        JdbcDialect dialect = new DerbyDialectFactory().create();
+        JdbcFilterPushdownPreparedStatementVisitor visitor =
+                new JdbcFilterPushdownPreparedStatementVisitor(dialect::quoteIdentifier);
+        ParameterizedPredicate pred = resolved.get(0).accept(visitor).get();
+
+        // our visitor always wrap expression
+        assertEquals(expectedOutputExpr, pred.getPredicate());
+        assertArrayEquals(expectedParams, pred.getParameters());
+    }
+
+    private void assertSimpleInputExprEqualsOutExpr(
+            String inputExpr, ResolvedSchema schema, String expectedOutput, Serializable param) {
+        // our visitor always wrap expression
+        Serializable[] params = new Serializable[1];
+        params[0] = param;
+        assertGeneratedSQLString(inputExpr, schema, "(" + expectedOutput + ")", params);
+    }
+
+    /**
+     * Resolve a SQL filter expression against a Schema, this method makes use of some
+     * implementation details of Flink.
+     */
+    private List<ResolvedExpression> resolveSQLFilterToExpression(
+            String sqlExp, ResolvedSchema schema) {
+        StreamTableEnvironmentImpl tbImpl = (StreamTableEnvironmentImpl) tEnv;
+
+        FlinkContext ctx = ((PlannerBase) tbImpl.getPlanner()).getFlinkContext();
+        CatalogManager catMan = tbImpl.getCatalogManager();
+        FunctionCatalog funCat = ctx.getFunctionCatalog();
+        RowType sourceType = (RowType) schema.toSourceRowDataType().getLogicalType();
+
+        FlinkTypeFactory typeFactory = new FlinkTypeFactory(classLoader, FlinkTypeSystem.INSTANCE);
+        RexNodeToExpressionConverter converter =
+                new RexNodeToExpressionConverter(
+                        new RexBuilder(typeFactory),
+                        sourceType.getFieldNames().toArray(new String[0]),
+                        funCat,
+                        catMan,
+                        TimeZone.getTimeZone(tEnv.getConfig().getLocalTimeZone()));
+
+        RexNodeExpression rexExp =
+                (RexNodeExpression) tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
+        ResolvedExpression resolvedExp =
+                rexExp.getRexNode()
+                        .accept(converter)
+                        .getOrElse(
+                                () -> {
+                                    throw new IllegalArgumentException(
+                                            "Cannot convert "
+                                                    + rexExp.getRexNode()
+                                                    + " to Expression, this likely "
+                                                    + "means you used some function(s) not "
+                                                    + "supported with this setup.");
+                                });
+        ExpressionResolver resolver =
+                ExpressionResolver.resolverFor(
+                                tEnv.getConfig(),
+                                classLoader,
+                                name -> Optional.empty(),
+                                funCat.asLookup(
+                                        str -> {
+                                            throw new TableException(
+                                                    "We should not need to lookup any expressions at this point");
+                                        }),
+                                catMan.getDataTypeFactory(),
+                                (sqlExpression, inputRowType, outputType) -> {
+                                    throw new TableException(
+                                            "SQL expression parsing is not supported at this location.");
+                                })
+                        .build();
+
+        return resolver.resolve(Arrays.asList(resolvedExp));
+    }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index 642fafe2a80..adac5b2ac1f 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -58,4 +58,10 @@ public class JdbcTablePlanTest extends TableTestBase {
     public void testLimitPushDown() {
         util.verifyExecPlan("SELECT id, time_col FROM jdbc LIMIT 3");
     }
+
+    @Test
+    public void testFilterPushdown() {
+        util.verifyExecPlan(
+                "SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23");
+    }
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
index 2333a35ad84..b69903f3c36 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -51,4 +51,21 @@ TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decima
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFilterPushdown">
+	<Resource name="sql">
+		<![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
+	</Resource>
+	<Resource name="ast">
+		<![CDATA[
+LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5, -1000.23:DECIMAL(6, 2)))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+	</Resource>
+	<Resource name="optimized exec plan">
+		<![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, jdbc, filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))], project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
+]]>
+	</Resource>
+  </TestCase>
 </Root>