You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/28 03:26:48 UTC

[GitHub] [flink] godfreyhe opened a new pull request, #20384: [FLINK-28707][table] Introduce SupportsDynamicFiltering

godfreyhe opened a new pull request, #20384:
URL: https://github.com/apache/flink/pull/20384

   
   
   ## What is the purpose of the change
   
   *Introduce SupportsDynamicFiltering to support dynamic partition pruning optimization, see [FLIP doc](https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning) for more details*
   
   
   ## Brief change log
   
   
     - *Introduce SupportsDynamicFiltering interface*
   
   
   ## Verifying this change
   
   
   
   This change is just introduce interface without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20384: [FLINK-28707][table] Introduce SupportsDynamicFiltering

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20384:
URL: https://github.com/apache/flink/pull/20384#discussion_r933708970


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import java.util.List;
+
+/**
+ * Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions even
+ * the input data in runtime to reduce scan I/O.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT * FROM partitioned_fact_table t1, non_partitioned_dim_table t2
+ *   WHERE t1.part_key = t2.col1 AND t2.col2 = 100
+ * }</pre>
+ *
+ * <p>In the example above, `partitioned_fact_table` is partition table which partition key is
+ * `part_key`, and `non_partitioned_dim_table` is a non-partition table which data contains all
+ * partition values of `partitioned_fact_table`. With the filter {@code t2.col2 = 100}, only a small
+ * part of the partitions need to be scanned out to do the join operation. The specific partitions
+ * is not available in the optimization phase but in the execution phase.
+ *
+ * <p>Unlike {@link SupportsPartitionPushDown}, the conditions in the WHERE clause are analyzed to
+ * determine in advance which partitions can be safely skipped in the optimization phase. While such
+ * the query above, the specific partitions is not available in the optimization phase but in the
+ * execution phase.
+ *
+ * <p>By default, if this interface is not implemented, the data is read entirely with a subsequent
+ * filter operation after the source.
+ *
+ * <p>If this interface is implemented, this interface just tells the source which fields can be
+ * applied for filtering and the source needs to pick the fields that can be supported and return
+ * them to planner. Then the planner will build the plan and construct the operator which will send
+ * the data to the source in runtime.
+ *
+ * <p>In the future, more flexible filtering can be pushed into the source connectors through this
+ * interface.
+ */
+@PublicEvolving
+public interface SupportsDynamicFiltering {
+
+    /**
+     * Apply the candidate filter fields into the table source, and return the accepted fields. The

Review Comment:
   nit: Applies



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import java.util.List;
+
+/**
+ * Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions even
+ * the input data in runtime to reduce scan I/O.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT * FROM partitioned_fact_table t1, non_partitioned_dim_table t2
+ *   WHERE t1.part_key = t2.col1 AND t2.col2 = 100
+ * }</pre>
+ *
+ * <p>In the example above, `partitioned_fact_table` is partition table which partition key is

Review Comment:
   which -> whose



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import java.util.List;
+
+/**
+ * Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions even
+ * the input data in runtime to reduce scan I/O.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * SELECT * FROM partitioned_fact_table t1, non_partitioned_dim_table t2
+ *   WHERE t1.part_key = t2.col1 AND t2.col2 = 100
+ * }</pre>
+ *
+ * <p>In the example above, `partitioned_fact_table` is partition table which partition key is
+ * `part_key`, and `non_partitioned_dim_table` is a non-partition table which data contains all
+ * partition values of `partitioned_fact_table`. With the filter {@code t2.col2 = 100}, only a small
+ * part of the partitions need to be scanned out to do the join operation. The specific partitions
+ * is not available in the optimization phase but in the execution phase.
+ *
+ * <p>Unlike {@link SupportsPartitionPushDown}, the conditions in the WHERE clause are analyzed to
+ * determine in advance which partitions can be safely skipped in the optimization phase. While such

Review Comment:
   While such the query above -> For such queries?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import java.util.List;
+
+/**
+ * Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions even

Review Comment:
   nit: push -> pushes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20384: [FLINK-28707][table] Introduce SupportsDynamicFiltering

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20384: [FLINK-28707][table] Introduce SupportsDynamicFiltering
URL: https://github.com/apache/flink/pull/20384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20384: [FLINK-28707][table] Introduce SupportsDynamicFiltering

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20384:
URL: https://github.com/apache/flink/pull/20384#issuecomment-1200078674

   Thanks for the review, I will change the comments locally, and merge the pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20384: [FLINK-28707][table] Introduce SupportsDynamicFiltering

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20384:
URL: https://github.com/apache/flink/pull/20384#issuecomment-1197614748

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c454070ee9d484a4aaf81c5fc99ee3d58eae0ffd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c454070ee9d484a4aaf81c5fc99ee3d58eae0ffd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c454070ee9d484a4aaf81c5fc99ee3d58eae0ffd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org