You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/02 11:19:56 UTC
[flink] branch master updated: [FLINK-16024][connector/jdbc] Support filter pushdown
This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 496700197b9 [FLINK-16024][connector/jdbc] Support filter pushdown
496700197b9 is described below
commit 496700197b9e2ca60f6cdd3eb01fa4a12227548a
Author: Qing Lim <q....@mwam.com>
AuthorDate: Wed Jun 22 12:55:20 2022 +0100
[FLINK-16024][connector/jdbc] Support filter pushdown
This closes #20140
---
.../CompositeJdbcParameterValuesProvider.java | 58 ++++
.../jdbc/table/JdbcDynamicTableSource.java | 113 +++++++-
...JdbcFilterPushdownPreparedStatementVisitor.java | 201 ++++++++++++++
.../jdbc/table/ParameterizedPredicate.java | 59 ++++
.../jdbc/table/JdbcDynamicTableSourceITCase.java | 140 ++++++++++
...FilterPushdownPreparedStatementVisitorTest.java | 297 +++++++++++++++++++++
.../connector/jdbc/table/JdbcTablePlanTest.java | 6 +
.../connector/jdbc/table/JdbcTablePlanTest.xml | 17 ++
8 files changed, 880 insertions(+), 11 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
new file mode 100644
index 00000000000..eebeac5abe0
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/** Combine 2 {@link JdbcParameterValuesProvider} into 1. */
+@Internal
+public class CompositeJdbcParameterValuesProvider implements JdbcParameterValuesProvider {
+ JdbcParameterValuesProvider a;
+ JdbcParameterValuesProvider b;
+
+ public CompositeJdbcParameterValuesProvider(
+ JdbcParameterValuesProvider a, JdbcParameterValuesProvider b) {
+ Preconditions.checkArgument(
+ a.getParameterValues().length == b.getParameterValues().length,
+ "Both JdbcParameterValuesProvider should have the same length.");
+ this.a = a;
+ this.b = b;
+ }
+
+ @Override
+ public Serializable[][] getParameterValues() {
+ int batchNum = this.a.getParameterValues().length;
+ Serializable[][] parameters = new Serializable[batchNum][];
+ for (int i = 0; i < batchNum; i++) {
+ Serializable[] aSlice = a.getParameterValues()[i];
+ Serializable[] bSlice = b.getParameterValues()[i];
+ int totalLen = aSlice.length + bSlice.length;
+
+ Serializable[] batchParams = new Serializable[totalLen];
+
+ System.arraycopy(aSlice, 0, batchParams, 0, aSlice.length);
+ System.arraycopy(bSlice, 0, batchParams, aSlice.length, bSlice.length);
+ parameters[i] = batchParams;
+ }
+ return parameters;
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 0bc7ae04792..e27cf63df50 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -22,25 +22,41 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
+import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
/** A {@link DynamicTableSource} for JDBC. */
@Internal
@@ -48,7 +64,9 @@ public class JdbcDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
- SupportsLimitPushDown {
+ SupportsLimitPushDown,
+ SupportsFilterPushDown {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class);
private final JdbcConnectorOptions options;
private final JdbcReadOptions readOptions;
@@ -57,6 +75,8 @@ public class JdbcDynamicTableSource
private DataType physicalRowDataType;
private final String dialectName;
private long limit = -1;
+ private List<String> resolvedPredicates = new ArrayList<>();
+ private Serializable[] pushdownParams = new Serializable[0];
public JdbcDynamicTableSource(
JdbcConnectorOptions options,
@@ -117,21 +137,46 @@ public class JdbcDynamicTableSource
options.getTableName(),
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
new String[0]);
+ final List<String> predicates = new ArrayList<String>();
+
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
int numPartitions = readOptions.getNumPartitions().get();
+
+ Serializable[][] allPushdownParams = replicatePushdownParamsForN(numPartitions);
+ JdbcParameterValuesProvider allParams =
+ new CompositeJdbcParameterValuesProvider(
+ new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
+ .ofBatchNum(numPartitions),
+ new JdbcGenericParameterValuesProvider(allPushdownParams));
+
+ builder.setParametersProvider(allParams);
+
+ predicates.add(
+ dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+ + " BETWEEN ? AND ?");
+ } else {
builder.setParametersProvider(
- new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
- .ofBatchNum(numPartitions));
- query +=
- " WHERE "
- + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
- + " BETWEEN ? AND ?";
+ new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
}
+
+ predicates.addAll(this.resolvedPredicates);
+
+ if (predicates.size() > 0) {
+ String joinedConditions =
+ predicates.stream()
+ .map(pred -> String.format("(%s)", pred))
+ .collect(Collectors.joining(" AND "));
+ query += " WHERE " + joinedConditions;
+ }
+
if (limit >= 0) {
query = String.format("%s %s", query, dialect.getLimitClause(limit));
}
+
+ LOG.debug("Query generated for JDBC scan: " + query);
+
builder.setQuery(query);
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
@@ -159,8 +204,12 @@ public class JdbcDynamicTableSource
@Override
public DynamicTableSource copy() {
- return new JdbcDynamicTableSource(
- options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
+ JdbcDynamicTableSource newSource =
+ new JdbcDynamicTableSource(
+ options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
+ newSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
+ newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, this.pushdownParams.length);
+ return newSource;
}
@Override
@@ -183,7 +232,9 @@ public class JdbcDynamicTableSource
&& Objects.equals(cache, that.cache)
&& Objects.equals(physicalRowDataType, that.physicalRowDataType)
&& Objects.equals(dialectName, that.dialectName)
- && Objects.equals(limit, that.limit);
+ && Objects.equals(limit, that.limit)
+ && Objects.equals(resolvedPredicates, that.resolvedPredicates)
+ && Arrays.deepEquals(pushdownParams, that.pushdownParams);
}
@Override
@@ -195,11 +246,51 @@ public class JdbcDynamicTableSource
cache,
physicalRowDataType,
dialectName,
- limit);
+ limit,
+ resolvedPredicates,
+ pushdownParams);
}
@Override
public void applyLimit(long limit) {
this.limit = limit;
}
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+ for (ResolvedExpression filter : filters) {
+ Optional<ParameterizedPredicate> simplePredicate = parseFilterToPredicate(filter);
+ if (simplePredicate.isPresent()) {
+ acceptedFilters.add(filter);
+ ParameterizedPredicate pred = simplePredicate.get();
+ this.pushdownParams = ArrayUtils.addAll(this.pushdownParams, pred.getParameters());
+ this.resolvedPredicates.add(pred.getPredicate());
+ } else {
+ remainingFilters.add(filter);
+ }
+ }
+
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ private Optional<ParameterizedPredicate> parseFilterToPredicate(ResolvedExpression filter) {
+ if (filter instanceof CallExpression) {
+ CallExpression callExp = (CallExpression) filter;
+ return callExp.accept(
+ new JdbcFilterPushdownPreparedStatementVisitor(
+ this.options.getDialect()::quoteIdentifier));
+ }
+ return Optional.empty();
+ }
+
+ private Serializable[][] replicatePushdownParamsForN(int n) {
+ Serializable[][] allPushdownParams = new Serializable[n][pushdownParams.length];
+ for (int i = 0; i < n; i++) {
+ allPushdownParams[i] = this.pushdownParams;
+ }
+ return allPushdownParams;
+ }
}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
new file mode 100644
index 00000000000..f9622a8a37d
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+ extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {
+
+ private final Function<String, String> quoteIdentifierFunction;
+
+ public JdbcFilterPushdownPreparedStatementVisitor(
+ Function<String, String> quoteIdentifierFunction) {
+ this.quoteIdentifierFunction = quoteIdentifierFunction;
+ }
+
+ @Override
+ public Optional<ParameterizedPredicate> visit(CallExpression call) {
+ if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator("=", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator("<", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator("<=", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator(">", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator(">=", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator("<>", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator("OR", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+ return renderBinaryOperator("AND", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+ return renderUnaryOperator("IS NULL", call.getResolvedChildren().get(0), true);
+ }
+ if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+ return renderUnaryOperator("IS NOT NULL", call.getResolvedChildren().get(0), true);
+ }
+
+ return Optional.empty();
+ }
+
+ private Optional<ParameterizedPredicate> renderBinaryOperator(
+ String operator, List<ResolvedExpression> allOperands) {
+ Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this);
+
+ Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this);
+
+ return leftOperandString.flatMap(
+ left -> rightOperandString.map(right -> left.combine(operator, right)));
+ }
+
+ private Optional<ParameterizedPredicate> renderUnaryOperator(
+ String operator, ResolvedExpression operand, boolean operandOnLeft) {
+ if (operand instanceof FieldReferenceExpression) {
+ Optional<ParameterizedPredicate> fieldPartialPredicate =
+ this.visit((FieldReferenceExpression) operand);
+ if (operandOnLeft) {
+ return fieldPartialPredicate.map(
+ fieldPred ->
+ new ParameterizedPredicate(
+ String.format(
+ "(%s %s)", fieldPred.getPredicate(), operator)));
+ } else {
+ return fieldPartialPredicate.map(
+ fieldPred ->
+ new ParameterizedPredicate(
+ String.format(
+ "(%s %s)", operator, fieldPred.getPredicate())));
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
+ LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+ Serializable[] params = new Serializable[1];
+
+ ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+ switch (tpe.getTypeRoot()) {
+ case CHAR:
+ params[0] = litExp.getValueAs(String.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case VARCHAR:
+ params[0] = litExp.getValueAs(String.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case BOOLEAN:
+ params[0] = litExp.getValueAs(Boolean.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case DECIMAL:
+ params[0] = litExp.getValueAs(BigDecimal.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case TINYINT:
+ params[0] = litExp.getValueAs(Byte.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case SMALLINT:
+ params[0] = litExp.getValueAs(Short.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case INTEGER:
+ params[0] = litExp.getValueAs(Integer.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case BIGINT:
+ params[0] = litExp.getValueAs(Long.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case FLOAT:
+ params[0] = litExp.getValueAs(Float.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case DOUBLE:
+ params[0] = litExp.getValueAs(Double.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case DATE:
+ params[0] = litExp.getValueAs(LocalDate.class).map(Date::valueOf).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case TIME_WITHOUT_TIME_ZONE:
+ params[0] = litExp.getValueAs(java.sql.Time.class).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ params[0] =
+ litExp.getValueAs(LocalDateTime.class).map(Timestamp::valueOf).orElse(null);
+ predicate.setParameters(params);
+ return Optional.of(predicate);
+ default:
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<ParameterizedPredicate> visit(FieldReferenceExpression fieldReference) {
+ String predicateStr = this.quoteIdentifierFunction.apply(fieldReference.toString());
+ ParameterizedPredicate predicate = new ParameterizedPredicate(predicateStr);
+ return Optional.of(predicate);
+ }
+
+ @Override
+ protected Optional<ParameterizedPredicate> defaultMethod(Expression expression) {
+ return Optional.empty();
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
new file mode 100644
index 00000000000..82a9a856c76
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.Serializable;
+
+/** A data class that model parameterized sql predicate. */
+@Experimental
+public class ParameterizedPredicate {
+ private String predicate;
+ private Serializable[] parameters;
+
+ public ParameterizedPredicate(String predicate) {
+ this.predicate = predicate;
+ this.parameters = new Serializable[0];
+ }
+
+ public Serializable[] getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Serializable[] parameters) {
+ this.parameters = parameters;
+ }
+
+ public String getPredicate() {
+ return predicate;
+ }
+
+ public void setPredicate(String predicate) {
+ this.predicate = predicate;
+ }
+
+ public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) {
+ this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate);
+ this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
+ return this;
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index c982e4c1a6c..d23c5d194cd 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -264,6 +264,146 @@ public class JdbcDynamicTableSourceITCase {
.containsAll(result);
}
+ @Test
+ public void testFilter() throws Exception {
+ String partitionedTable = "PARTITIONED_TABLE";
+ tEnv.executeSql(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + "("
+ + "id BIGINT,"
+ + "timestamp6_col TIMESTAMP(6),"
+ + "timestamp9_col TIMESTAMP(9),"
+ + "time_col TIME,"
+ + "real_col FLOAT,"
+ + "double_col DOUBLE,"
+ + "decimal_col DECIMAL(10, 4)"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + DB_URL
+ + "',"
+ + " 'table-name'='"
+ + INPUT_TABLE
+ + "'"
+ + ")");
+
+ // create a partitioned table to ensure no regression
+ tEnv.executeSql(
+ "CREATE TABLE "
+ + partitionedTable
+ + "("
+ + "id BIGINT,"
+ + "timestamp6_col TIMESTAMP(6),"
+ + "timestamp9_col TIMESTAMP(9),"
+ + "time_col TIME,"
+ + "real_col FLOAT,"
+ + "double_col DOUBLE,"
+ + "decimal_col DECIMAL(10, 4)"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + DB_URL
+ + "',"
+ + " 'table-name'='"
+ + INPUT_TABLE
+ + "',"
+ + " 'scan.partition.column'='id',\n"
+ + " 'scan.partition.num'='1',\n"
+ + " 'scan.partition.lower-bound'='1',\n"
+ + " 'scan.partition.upper-bound'='1'\n"
+ + ")");
+
+ // we create a VIEW here to test column remapping, ie. would filter push down work if we
+ // create a view that depends on our source table
+ tEnv.executeSql(
+ String.format(
+ "CREATE VIEW FAKE_TABLE ("
+ + "idx, timestamp6_col, timestamp9_col, time_col, real_col, double_col, decimal_col"
+ + ") as (SELECT * from %s )",
+ INPUT_TABLE));
+
+ List<String> onlyRow1 =
+ Stream.of(
+ "+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]")
+ .collect(Collectors.toList());
+
+ List<String> twoRows =
+ Stream.of(
+ "+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]",
+ "+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]")
+ .collect(Collectors.toList());
+
+ List<String> onlyRow2 =
+ Stream.of(
+ "+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]")
+ .collect(Collectors.toList());
+ List<String> noRows = new ArrayList<>();
+
+ // test simple filter
+ assertQueryReturns("SELECT * FROM FAKE_TABLE WHERE idx = 1", onlyRow1);
+ // test TIMESTAMP filter
+ assertQueryReturns(
+ "SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'",
+ onlyRow1);
+ // test the IN operator
+ assertQueryReturns(
+ "SELECT * FROM "
+ + "FAKE_TABLE"
+ + " WHERE 1 = idx AND decimal_col IN (100.1234, 101.1234)",
+ onlyRow1);
+ // test mixing AND and OR operator
+ assertQueryReturns(
+ "SELECT * FROM "
+ + "FAKE_TABLE"
+ + " WHERE idx = 1 AND decimal_col = 100.1234 OR decimal_col = 101.1234",
+ twoRows);
+ // test mixing AND/OR with parenthesis, and the swapping the operand of equal expression
+ assertQueryReturns(
+ "SELECT * FROM "
+ + "FAKE_TABLE"
+ + " WHERE (2 = idx AND decimal_col = 100.1234) OR decimal_col = 101.1234",
+ onlyRow2);
+
+ // test Greater than, just to make sure we didnt break anything that we cannot pushdown
+ assertQueryReturns(
+ "SELECT * FROM "
+ + "FAKE_TABLE"
+ + " WHERE idx = 2 AND decimal_col > 100 OR decimal_col = 101.123",
+ onlyRow2);
+
+ // One more test of parenthesis
+ assertQueryReturns(
+ "SELECT * FROM "
+ + "FAKE_TABLE"
+ + " WHERE 2 = idx AND (decimal_col = 100.1234 OR real_col = 101.1234)",
+ noRows);
+
+ assertQueryReturns(
+ "SELECT * FROM "
+ + partitionedTable
+ + " WHERE id = 2 AND decimal_col > 100 OR decimal_col = 101.123",
+ noRows);
+
+ assertQueryReturns(
+ "SELECT * FROM "
+ + partitionedTable
+ + " WHERE 1 = id AND decimal_col IN (100.1234, 101.1234)",
+ onlyRow1);
+ }
+
+ private List<String> rowIterToList(Iterator<Row> rows) {
+ return CollectionUtil.iteratorToList(rows).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ private void assertQueryReturns(String query, List<String> expected) {
+ List<String> actual = rowIterToList(tEnv.executeSql(query).collect());
+ assertThat(actual).isEqualTo(expected);
+ }
+
@ParameterizedTest
@EnumSource(Caching.class)
void testLookupJoin(Caching caching) throws Exception {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
new file mode 100644
index 00000000000..e7eb8a43a66
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.derby.DerbyDialectFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link JdbcFilterPushdownPreparedStatementVisitor}. */
+public class JdbcFilterPushdownPreparedStatementVisitorTest {
+
+ private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
+ public static final String DB_URL = "jdbc:derby:memory:test";
+ public static final String INPUT_TABLE = "jdbDynamicTableSource";
+
+ public static StreamExecutionEnvironment env;
+ public static TableEnvironment tEnv;
+
+ @Before
+ public void before() throws ClassNotFoundException, SQLException {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env);
+
+ System.setProperty(
+ "derby.stream.error.field", JdbcTestBase.class.getCanonicalName() + ".DEV_NULL");
+ Class.forName(DRIVER_CLASS);
+
+ try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + " ("
+ + "id BIGINT NOT NULL,"
+ + "description VARCHAR(200) NOT NULL,"
+ + "timestamp6_col TIMESTAMP, "
+ + "timestamp9_col TIMESTAMP, "
+ + "time_col TIME, "
+ + "real_col FLOAT(23), "
+ + // A precision of 23 or less makes FLOAT equivalent to REAL.
+ "double_col FLOAT(24),"
+ + // A precision of 24 or greater makes FLOAT equivalent to DOUBLE
+ // PRECISION.
+ "decimal_col DECIMAL(10, 4))");
+ }
+ // Create table in Flink, this can be reused across test cases
+ tEnv.executeSql(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + "("
+ + "id BIGINT,"
+ + "description VARCHAR(200),"
+ + "timestamp6_col TIMESTAMP(6),"
+ + "timestamp9_col TIMESTAMP(9),"
+ + "time_col TIME,"
+ + "real_col FLOAT,"
+ + "double_col DOUBLE,"
+ + "decimal_col DECIMAL(10, 4)"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + DB_URL
+ + "',"
+ + " 'table-name'='"
+ + INPUT_TABLE
+ + "'"
+ + ")");
+ }
+
+ @After
+ public void clearOutputTable() throws Exception {
+ Class.forName(DRIVER_CLASS);
+ try (Connection conn = DriverManager.getConnection(DB_URL);
+ Statement stat = conn.createStatement()) {
+ stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
+ }
+ StreamTestSink.clear();
+ }
+
+ @Test
+ public void testSimpleExpressionPrimitiveType() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+ Arrays.asList(
+ new Object[] {"id = 6", "id = ?", 6L},
+ new Object[] {"id >= 6", "id >= ?", 6},
+ new Object[] {"id > 6", "id > ?", 6},
+ new Object[] {"id < 6", "id < ?", 6},
+ new Object[] {"id <= 5", "id <= ?", 5},
+ new Object[] {"description = 'Halo'", "description = ?", "Halo"},
+ new Object[] {"real_col > 0.5", "real_col > ?", new BigDecimal("0.5")},
+ new Object[] {
+ "double_col <= -0.3", "double_col <= ?", new BigDecimal("-0.3")
+ })
+ .forEach(
+ inputs ->
+ assertSimpleInputExprEqualsOutExpr(
+ (String) inputs[0],
+ schema,
+ (String) inputs[1],
+ (Serializable) inputs[2]));
+ }
+
+ @Test
+ public void testComplexExpressionDatetime() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+ String andExpr = "id = 6 AND timestamp6_col = TIMESTAMP '2022-01-01 07:00:01.333'";
+ Serializable[] expectedParams1 = {6L, Timestamp.valueOf("2022-01-01 07:00:01.333000")};
+ assertGeneratedSQLString(
+ andExpr, schema, "((id = ?) AND (timestamp6_col = ?))", expectedParams1);
+
+ Serializable[] expectedParams2 = {Timestamp.valueOf("2022-01-01 07:00:01.333"), "Halo"};
+ String orExpr =
+ "timestamp9_col = TIMESTAMP '2022-01-01 07:00:01.333' OR description = 'Halo'";
+ assertGeneratedSQLString(
+ orExpr, schema, "((timestamp9_col = ?) OR (description = ?))", expectedParams2);
+ }
+
+ @Test
+ public void testExpressionWithNull() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+ String andExpr = "id = NULL AND real_col <= 0.6";
+
+ Serializable[] expectedParams1 = {null, new BigDecimal("0.6")};
+ assertGeneratedSQLString(
+ andExpr, schema, "((id = ?) AND (real_col <= ?))", expectedParams1);
+
+ Serializable[] expectedParams2 = {6L, null};
+ String orExpr = "id = 6 OR description = NULL";
+ assertGeneratedSQLString(
+ orExpr, schema, "((id = ?) OR (description = ?))", expectedParams2);
+ }
+
+ @Test
+ public void testExpressionIsNull() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+ String andExpr = "id IS NULL AND real_col <= 0.6";
+
+ Serializable[] expectedParams1 = {new BigDecimal("0.6")};
+ assertGeneratedSQLString(
+ andExpr, schema, "((id IS NULL) AND (real_col <= ?))", expectedParams1);
+
+ Serializable[] expectedParams2 = {6L};
+ String orExpr = "id = 6 OR description IS NOT NULL";
+ assertGeneratedSQLString(
+ orExpr, schema, "((id = ?) OR (description IS NOT NULL))", expectedParams2);
+ }
+
+ @Test
+ public void testComplexExpressionPrimitiveType() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
+ String andExpr = "id = NULL AND real_col <= 0.6";
+ Serializable[] expectedParams1 = {null, new BigDecimal("0.6")};
+ assertGeneratedSQLString(
+ andExpr, schema, "((id = ?) AND (real_col <= ?))", expectedParams1);
+
+ String orExpr = "id = 6 OR description = NULL";
+ Serializable[] expectedParams2 = {6L, null};
+ assertGeneratedSQLString(
+ orExpr, schema, "((id = ?) OR (description = ?))", expectedParams2);
+ }
+
+ private void assertGeneratedSQLString(
+ String inputExpr,
+ ResolvedSchema schema,
+ String expectedOutputExpr,
+ Serializable[] expectedParams) {
+ List<ResolvedExpression> resolved = resolveSQLFilterToExpression(inputExpr, schema);
+ assertEquals(1, resolved.size());
+ JdbcDialect dialect = new DerbyDialectFactory().create();
+ JdbcFilterPushdownPreparedStatementVisitor visitor =
+ new JdbcFilterPushdownPreparedStatementVisitor(dialect::quoteIdentifier);
+ ParameterizedPredicate pred = resolved.get(0).accept(visitor).get();
+
+ // our visitor always wrap expression
+ assertEquals(expectedOutputExpr, pred.getPredicate());
+ assertArrayEquals(expectedParams, pred.getParameters());
+ }
+
+ private void assertSimpleInputExprEqualsOutExpr(
+ String inputExpr, ResolvedSchema schema, String expectedOutput, Serializable param) {
+ // our visitor always wrap expression
+ Serializable[] params = new Serializable[1];
+ params[0] = param;
+ assertGeneratedSQLString(inputExpr, schema, "(" + expectedOutput + ")", params);
+ }
+
+ /**
+ * Resolve a SQL filter expression against a Schema, this method makes use of some
+ * implementation details of Flink.
+ */
+ private List<ResolvedExpression> resolveSQLFilterToExpression(
+ String sqlExp, ResolvedSchema schema) {
+ StreamTableEnvironmentImpl tbImpl = (StreamTableEnvironmentImpl) tEnv;
+
+ FlinkContext ctx = ((PlannerBase) tbImpl.getPlanner()).getFlinkContext();
+ CatalogManager catMan = tbImpl.getCatalogManager();
+ FunctionCatalog funCat = ctx.getFunctionCatalog();
+ RowType sourceType = (RowType) schema.toSourceRowDataType().getLogicalType();
+
+ FlinkTypeFactory typeFactory = new FlinkTypeFactory(classLoader, FlinkTypeSystem.INSTANCE);
+ RexNodeToExpressionConverter converter =
+ new RexNodeToExpressionConverter(
+ new RexBuilder(typeFactory),
+ sourceType.getFieldNames().toArray(new String[0]),
+ funCat,
+ catMan,
+ TimeZone.getTimeZone(tEnv.getConfig().getLocalTimeZone()));
+
+ RexNodeExpression rexExp =
+ (RexNodeExpression) tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
+ ResolvedExpression resolvedExp =
+ rexExp.getRexNode()
+ .accept(converter)
+ .getOrElse(
+ () -> {
+ throw new IllegalArgumentException(
+ "Cannot convert "
+ + rexExp.getRexNode()
+ + " to Expression, this likely "
+ + "means you used some function(s) not "
+ + "supported with this setup.");
+ });
+ ExpressionResolver resolver =
+ ExpressionResolver.resolverFor(
+ tEnv.getConfig(),
+ classLoader,
+ name -> Optional.empty(),
+ funCat.asLookup(
+ str -> {
+ throw new TableException(
+ "We should not need to lookup any expressions at this point");
+ }),
+ catMan.getDataTypeFactory(),
+ (sqlExpression, inputRowType, outputType) -> {
+ throw new TableException(
+ "SQL expression parsing is not supported at this location.");
+ })
+ .build();
+
+ return resolver.resolve(Arrays.asList(resolvedExp));
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index 642fafe2a80..adac5b2ac1f 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -58,4 +58,10 @@ public class JdbcTablePlanTest extends TableTestBase {
public void testLimitPushDown() {
util.verifyExecPlan("SELECT id, time_col FROM jdbc LIMIT 3");
}
+
+ @Test
+ public void testFilterPushdown() {
+ util.verifyExecPlan(
+ "SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23");
+ }
}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
index 2333a35ad84..b69903f3c36 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -51,4 +51,21 @@ TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decima
]]>
</Resource>
</TestCase>
+ <TestCase name="testFilterPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5, -1000.23:DECIMAL(6, 2)))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, jdbc, filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))], project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
+]]>
+ </Resource>
+ </TestCase>
</Root>