You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/30 15:17:32 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

lincoln-lil opened a new pull request, #20118:
URL: https://github.com/apache/flink/pull/20118

   ## What is the purpose of the change
   To fix the error when a table source SupportsReadingMetadata and not SupportsProjectionPushDown in PushProjectIntoTableSourceScanRule.
   
   ## Brief change log
   fix the logic of rewriteProjects in `PushProjectIntoTableSourceScanRule`
   
   ## Verifying this change
   Add new cases into `TableSourceTest`
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   
   ## Documentation
     - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20118:
URL: https://github.com/apache/flink/pull/20118#issuecomment-1171824806

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20118:
URL: https://github.com/apache/flink/pull/20118#discussion_r915444008


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##########
@@ -337,11 +339,71 @@ private RowType performPushDown(
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata
+            // columns.
+            newProjects =
+                    NestedProjectionUtil.rewrite(
+                            newProjects, projectedSchema, call.builder().getRexBuilder());
+        } else if (supportsMetadata(source.tableSource())) {
+            // supportsMetadataProjection only.
+            // Note: why not reuse the NestedProjectionUtil to rewrite metadata projection? because
+            // it only works for sources which support projection push down.
+            List<Column.MetadataColumn> metadataColumns =
+                    DynamicSourceUtils.extractMetadataColumns(
+                            source.contextResolvedTable().getResolvedSchema());
+            if (metadataColumns.size() > 0) {
+                Set<String> metaCols =
+                        metadataColumns.stream().map(m -> m.getName()).collect(Collectors.toSet());
+
+                MetadataOnlyProjectionRewriter rewriter =
+                        new MetadataOnlyProjectionRewriter(
+                                project.getInput().getRowType(), source.getRowType(), metaCols);
+
+                newProjects =
+                        newProjects.stream()
+                                .map(p -> p.accept(rewriter))
+                                .collect(Collectors.toList());
+            }
+        }
+
+        return newProjects;
+    }
+
+    private class MetadataOnlyProjectionRewriter extends RexShuttle {

Review Comment:
   nit: This class can be marked as `static`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20118:
URL: https://github.com/apache/flink/pull/20118#discussion_r915383719


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##########
@@ -337,11 +339,69 @@ private RowType performPushDown(
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata

Review Comment:
   I've created a ticket to track this: https://issues.apache.org/jira/browse/FLINK-28434
   cc @fsk119 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20118:
URL: https://github.com/apache/flink/pull/20118#issuecomment-1171356729

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c208d7441a53da94002d563a5777c90c38dfd5cb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c208d7441a53da94002d563a5777c90c38dfd5cb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c208d7441a53da94002d563a5777c90c38dfd5cb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20118:
URL: https://github.com/apache/flink/pull/20118#discussion_r914676211


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##########
@@ -337,11 +339,69 @@ private RowType performPushDown(
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata

Review Comment:
   It's better we can create a JIRA to tract this improvement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20118:
URL: https://github.com/apache/flink/pull/20118#discussion_r913549951


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##########
@@ -337,11 +339,69 @@ private RowType performPushDown(
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata

Review Comment:
   Since `NestedProjectionUtil` has handled metadata column and physical column, we should use this tool instead of a new one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown
URL: https://github.com/apache/flink/pull/20118


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20118:
URL: https://github.com/apache/flink/pull/20118#discussion_r914527539


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##########
@@ -337,11 +339,69 @@ private RowType performPushDown(
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata

Review Comment:
   After several failure cases debuging and an offline discussion with @godfreyhe and @fsk119, we decide rollback this fix to the original version due to the design assumption of `NestedProjectionUtil` which relies on the rewritten of the input row type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20118:
URL: https://github.com/apache/flink/pull/20118#discussion_r913843116


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java:
##########
@@ -337,11 +339,69 @@ private RowType performPushDown(
     private List<RexNode> rewriteProjections(
             RelOptRuleCall call, TableSourceTable source, NestedSchema projectedSchema) {
         final LogicalProject project = call.rel(0);
+        List<RexNode> newProjects = project.getProjects();
+
         if (supportsProjectionPushDown(source.tableSource())) {
-            return NestedProjectionUtil.rewrite(
-                    project.getProjects(), projectedSchema, call.builder().getRexBuilder());
-        } else {
-            return project.getProjects();
+            // if support project push down, then all input ref will be rewritten includes metadata

Review Comment:
   Yes, good question here, I've update the pr by simplifying the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20118: [FLINK-28334][table-planner] Fix PushProjectIntoTableSourceScanRule: covers the case when table source SupportsReadingMetadata and not SupportsProjectionPushDown

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20118:
URL: https://github.com/apache/flink/pull/20118#issuecomment-1174561623

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org