You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/03 08:59:37 UTC

[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: Flink 16024/1.16 jdbc filter

qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r912452959


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java:
##########
@@ -142,4 +143,18 @@ Optional<String> getUpsertStatement(
      */
     String getSelectFromStatement(
             String tableName, String[] selectFields, String[] conditionFields);
+
+    /**
+     * Get FilterPushdownVisitor that can convert Flink SQL Filter Expression into corresponding SQL
+     * dialect Filter Expression. The resulting string can then be pushdown to SQL data source to
+     * optimize the query.
+     *
+     * <p>You can customize the rendering for your dialect by overriding this method, and extends
+     * from {@link JdbcFilterPushdownVisitor}
+     *
+     * @return {@link JdbcFilterPushdownVisitor}
+     */
+    default JdbcFilterPushdownVisitor getFilterPushdownVisitor() {

Review Comment:
   This should be dialect specific so that it can be specialized to specific SQL syntax.
   
   I noticed we removed all default implementation from this interface, should I do the same for this method?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -31,28 +31,41 @@
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** A {@link DynamicTableSource} for JDBC. */
 @Internal
 public class JdbcDynamicTableSource
         implements ScanTableSource,
                 LookupTableSource,
                 SupportsProjectionPushDown,
-                SupportsLimitPushDown {
+                SupportsLimitPushDown,
+                SupportsFilterPushDown {

Review Comment:
   Support Filter Pushdown in JDBC source



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##########
@@ -171,17 +205,60 @@ public boolean equals(Object o) {
                 && Objects.equals(lookupOptions, that.lookupOptions)
                 && Objects.equals(physicalRowDataType, that.physicalRowDataType)
                 && Objects.equals(dialectName, that.dialectName)
-                && Objects.equals(limit, that.limit);
+                && Objects.equals(limit, that.limit)
+                && Objects.equals(resolvedPredicates, that.resolvedPredicates);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
-                options, readOptions, lookupOptions, physicalRowDataType, dialectName, limit);
+                options,
+                readOptions,
+                lookupOptions,
+                physicalRowDataType,
+                dialectName,
+                limit,
+                resolvedPredicates);
     }
 
     @Override
     public void applyLimit(long limit) {
         this.limit = limit;
     }
+
+    /**
+     * This method makes use of {@link JdbcFilterPushdownVisitor} to generate dialect-specific SQL
+     * expression. The visitor returns Optional.empty() for filter that it cannot handle, which will
+     * then be handled in Flink runtime.
+     */
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {

Review Comment:
   Core implementation of this change:
   
   We traverse the ResolvedExpression, and produce a String if we know how to push it to SQL Database, returning None if we cannot handle it.
   
   The unhandled expression will be kept as Flink SQL and run in the job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org