You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/03/29 05:00:44 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #5770: improve Filter pushdown to Join

mingmwang opened a new pull request, #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
   Improve some TPCH query performance, simply the generate logical plan and physical plan.
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   1. Convert filters to join filters for Inner Join
   2. Avoid duplicated filters
   3. Fixed unstable physical HashJoin plan
   
   tpch-q7,  tpch-q17, tpch-q19, tpch-q20 are impacted by this PR.
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   5. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #5770: improve Filter pushdown to Join

Posted by "andygrove (via GitHub)" <gi...@apache.org>.
andygrove commented on PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#issuecomment-1490787975

   @mingmwang Could you share any performance numbers for the improvements for the affected queries?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan merged pull request #5770: improve Filter pushdown to Join

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan merged PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5770: improve Filter pushdown to Join

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#issuecomment-1491823259

   > @mingmwang Could you share any performance numbers for the improvements for the affected queries?
   
   Sure, will do. unfortunately, the performance improvement is just a little. For q17, the major bottleneck is still the `Aggregation`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5770: improve Filter pushdown to Join

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#issuecomment-1488163298

   @yahoNanJing 
   Please help me to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5770: improve Filter pushdown to Join

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#discussion_r1154391387


##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -952,21 +952,21 @@ impl DefaultPhysicalPlanner {
 
                     let join_filter = match filter {
                         Some(expr) => {
-                            // Extract columns from filter expression
+                            // Extract columns from filter expression and saved in a HashSet
                             let cols = expr.to_columns()?;
 
-                            // Collect left & right field indices
+                            // Collect left & right field indices, the field indices are sorted in ascending order

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5770: improve Filter pushdown to Join

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#discussion_r1154395711


##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                            |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                           |
-|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                               |
-|               |     Projection: lineitem.l_extendedprice                                                                                                                                        |
-|               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND __scalar_sq_1.l_partkey = lineitem.l_partkey              |
-|               |         Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value                                           |
-|               |           Inner Join: part.p_partkey = __scalar_sq_1.l_partkey                                                                                                                  |
-|               |             Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey                                                                                 |
-|               |               Inner Join: lineitem.l_partkey = part.p_partkey                                                                                                                   |
-|               |                 TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]                                                                                         |
-|               |                 Projection: part.p_partkey                                                                                                                                      |
-|               |                   Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")                                                                                |
-|               |                     TableScan: part projection=[p_partkey, p_brand, p_container]                                                                                                |
-|               |             SubqueryAlias: __scalar_sq_1                                                                                                                                        |
-|               |               Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value                                                               |
-|               |                 Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]                                                                                    |
-|               |                   TableScan: lineitem projection=[l_partkey, l_quantity]                                                                                                        |
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly]                                                                                       |
-|               |   AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                       |
-|               |     CoalescePartitionsExec                                                                                                                                                      |
-|               |       AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                 |
-|               |         ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice]                                                                                                             |
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                           |
-|               |             FilterExec: CAST(l_quantity@1 AS Decimal128(30, 15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0                                          |
-|               |               ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_partkey@4 as l_partkey, __value@5 as __value] |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                     |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: "l_partkey", index: 0 })]                        |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                 |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), input_partitions=2                                                       |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           FilterExec: p_partkey@3 = l_partkey@0 AND l_partkey@0 = p_partkey@3                                                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                       |
-|               |                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })]          |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=0                                         |
-|               |                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                              |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2                                         |
-|               |                                       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                      |
-|               |                                         ProjectionExec: expr=[p_partkey@0 as p_partkey]                                                                                         |
-|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                           |
-|               |                                             FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX                                                                        |
-|               |                                               MemoryExec: partitions=0, partition_sizes=[]                                                                                      |
-|               |                     ProjectionExec: expr=[l_partkey@0 as l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]                                               |
-|               |                       AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                     |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                |
-|               |                               AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                      |
-|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                    |
-|               |                                                                                                                                                                                 |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                                                                                                                                                                                                                                                                                                                  |
+|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                                                                                                                                                                                                                                                                                                                      |
+|               |     Projection: lineitem.l_extendedprice                                                                                                                                                                                                                                                                                                                                                                                                                               |
+|               |       Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))                                                                                                                                                                                                                                                                                                 |

Review Comment:
   > This is a better plan because the redundant
   > 
   > ```
   > Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey
   > ```
   > 
   > , which is already done by the earlier joins, is removed, right?
   
   Yes, the duplicated filters are removed. Actually why the original plan include duplicate filters is because the `push_down_filter` rule infers additional filters and try to pushdown them down. If they can not be pushed down, those inferred filters are added back to the Filters, this is unnecessary, need to differ the inferred filters and the original filters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5770: improve Filter pushdown to Join

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#discussion_r1152250301


##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                            |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                           |
-|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                               |
-|               |     Projection: lineitem.l_extendedprice                                                                                                                                        |
-|               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND __scalar_sq_1.l_partkey = lineitem.l_partkey              |
-|               |         Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value                                           |
-|               |           Inner Join: part.p_partkey = __scalar_sq_1.l_partkey                                                                                                                  |
-|               |             Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey                                                                                 |
-|               |               Inner Join: lineitem.l_partkey = part.p_partkey                                                                                                                   |
-|               |                 TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]                                                                                         |
-|               |                 Projection: part.p_partkey                                                                                                                                      |
-|               |                   Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")                                                                                |
-|               |                     TableScan: part projection=[p_partkey, p_brand, p_container]                                                                                                |
-|               |             SubqueryAlias: __scalar_sq_1                                                                                                                                        |
-|               |               Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value                                                               |
-|               |                 Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]                                                                                    |
-|               |                   TableScan: lineitem projection=[l_partkey, l_quantity]                                                                                                        |
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly]                                                                                       |
-|               |   AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                       |
-|               |     CoalescePartitionsExec                                                                                                                                                      |
-|               |       AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                 |
-|               |         ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice]                                                                                                             |
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                           |
-|               |             FilterExec: CAST(l_quantity@1 AS Decimal128(30, 15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0                                          |
-|               |               ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_partkey@4 as l_partkey, __value@5 as __value] |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                     |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: "l_partkey", index: 0 })]                        |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                 |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), input_partitions=2                                                       |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           FilterExec: p_partkey@3 = l_partkey@0 AND l_partkey@0 = p_partkey@3                                                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                       |
-|               |                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })]          |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=0                                         |
-|               |                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                              |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2                                         |
-|               |                                       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                      |
-|               |                                         ProjectionExec: expr=[p_partkey@0 as p_partkey]                                                                                         |
-|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                           |
-|               |                                             FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX                                                                        |
-|               |                                               MemoryExec: partitions=0, partition_sizes=[]                                                                                      |
-|               |                     ProjectionExec: expr=[l_partkey@0 as l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]                                               |
-|               |                       AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                     |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                |
-|               |                               AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                      |
-|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                    |
-|               |                                                                                                                                                                                 |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                                                                                                                                                                                                                                                                                                                  |
+|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                                                                                                                                                                                                                                                                                                                      |
+|               |     Projection: lineitem.l_extendedprice                                                                                                                                                                                                                                                                                                                                                                                                                               |
+|               |       Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))                                                                                                                                                                                                                                                                                                 |

Review Comment:
   This is a better plan because the redundant 
   ```
   Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey
   ```
   , which is already done by the earlier joins, is removed, right?



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -952,21 +952,21 @@ impl DefaultPhysicalPlanner {
 
                     let join_filter = match filter {
                         Some(expr) => {
-                            // Extract columns from filter expression
+                            // Extract columns from filter expression and saved in a HashSet
                             let cols = expr.to_columns()?;
 
-                            // Collect left & right field indices
+                            // Collect left & right field indices, the field indices are sorted in ascending order

Review Comment:
   If the sort order is important for later stages, can you make a note about the rationale (so the comment explains why the sorting is important, in addition to noting the output is sorted)



##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                            |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                           |
-|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                               |
-|               |     Projection: lineitem.l_extendedprice                                                                                                                                        |
-|               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND __scalar_sq_1.l_partkey = lineitem.l_partkey              |
-|               |         Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value                                           |
-|               |           Inner Join: part.p_partkey = __scalar_sq_1.l_partkey                                                                                                                  |
-|               |             Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey                                                                                 |
-|               |               Inner Join: lineitem.l_partkey = part.p_partkey                                                                                                                   |
-|               |                 TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]                                                                                         |
-|               |                 Projection: part.p_partkey                                                                                                                                      |
-|               |                   Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")                                                                                |
-|               |                     TableScan: part projection=[p_partkey, p_brand, p_container]                                                                                                |
-|               |             SubqueryAlias: __scalar_sq_1                                                                                                                                        |
-|               |               Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value                                                               |
-|               |                 Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]                                                                                    |
-|               |                   TableScan: lineitem projection=[l_partkey, l_quantity]                                                                                                        |
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly]                                                                                       |
-|               |   AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                       |
-|               |     CoalescePartitionsExec                                                                                                                                                      |
-|               |       AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                 |
-|               |         ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice]                                                                                                             |
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                           |
-|               |             FilterExec: CAST(l_quantity@1 AS Decimal128(30, 15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0                                          |
-|               |               ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_partkey@4 as l_partkey, __value@5 as __value] |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                     |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: "l_partkey", index: 0 })]                        |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                 |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), input_partitions=2                                                       |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           FilterExec: p_partkey@3 = l_partkey@0 AND l_partkey@0 = p_partkey@3                                                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                       |
-|               |                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })]          |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=0                                         |
-|               |                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                              |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2                                         |
-|               |                                       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                      |
-|               |                                         ProjectionExec: expr=[p_partkey@0 as p_partkey]                                                                                         |
-|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                           |
-|               |                                             FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX                                                                        |
-|               |                                               MemoryExec: partitions=0, partition_sizes=[]                                                                                      |
-|               |                     ProjectionExec: expr=[l_partkey@0 as l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]                                               |
-|               |                       AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                     |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                |
-|               |                               AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                      |
-|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                    |
-|               |                                                                                                                                                                                 |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                                                                                                                                                                                                                                                                                                                  |
+|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                                                                                                                                                                                                                                                                                                                      |
+|               |     Projection: lineitem.l_extendedprice                                                                                                                                                                                                                                                                                                                                                                                                                               |
+|               |       Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))                                                                                                                                                                                                                                                                                                 |

Review Comment:
   It also pushes the filter
   
   ```
   |               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND __scalar_sq_1.l_partkey = lineitem.l_partkey              |
   ``
   
   Into the Join which seems like a win to me (avoid generating output)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #5770: improve Filter pushdown to Join

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#discussion_r1154410295


##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                            |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                           |
-|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                               |
-|               |     Projection: lineitem.l_extendedprice                                                                                                                                        |
-|               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND __scalar_sq_1.l_partkey = lineitem.l_partkey              |
-|               |         Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value                                           |
-|               |           Inner Join: part.p_partkey = __scalar_sq_1.l_partkey                                                                                                                  |
-|               |             Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey                                                                                 |
-|               |               Inner Join: lineitem.l_partkey = part.p_partkey                                                                                                                   |
-|               |                 TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]                                                                                         |
-|               |                 Projection: part.p_partkey                                                                                                                                      |
-|               |                   Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")                                                                                |
-|               |                     TableScan: part projection=[p_partkey, p_brand, p_container]                                                                                                |
-|               |             SubqueryAlias: __scalar_sq_1                                                                                                                                        |
-|               |               Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value                                                               |
-|               |                 Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]                                                                                    |
-|               |                   TableScan: lineitem projection=[l_partkey, l_quantity]                                                                                                        |
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly]                                                                                       |
-|               |   AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                       |
-|               |     CoalescePartitionsExec                                                                                                                                                      |
-|               |       AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)]                                                                                                 |
-|               |         ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice]                                                                                                             |
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                           |
-|               |             FilterExec: CAST(l_quantity@1 AS Decimal128(30, 15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0                                          |
-|               |               ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_partkey@4 as l_partkey, __value@5 as __value] |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                     |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: "l_partkey", index: 0 })]                        |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                 |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), input_partitions=2                                                       |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           FilterExec: p_partkey@3 = l_partkey@0 AND l_partkey@0 = p_partkey@3                                                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                       |
-|               |                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })]          |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=0                                         |
-|               |                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                              |
-|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
-|               |                                     RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2                                         |
-|               |                                       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                      |
-|               |                                         ProjectionExec: expr=[p_partkey@0 as p_partkey]                                                                                         |
-|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                           |
-|               |                                             FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX                                                                        |
-|               |                                               MemoryExec: partitions=0, partition_sizes=[]                                                                                      |
-|               |                     ProjectionExec: expr=[l_partkey@0 as l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]                                               |
-|               |                       AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                     |
-|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                             |
-|               |                           RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2                                                   |
-|               |                             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                |
-|               |                               AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                                                      |
-|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                    |
-|               |                                                                                                                                                                                 |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                                                                                                                                                                                                                                                                                                                  |
+|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]                                                                                                                                                                                                                                                                                                                                                                                                      |
+|               |     Projection: lineitem.l_extendedprice                                                                                                                                                                                                                                                                                                                                                                                                                               |
+|               |       Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))                                                                                                                                                                                                                                                                                                 |

Review Comment:
   Nice, thanks for the explanation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #5770: improve Filter pushdown to Join

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#issuecomment-1492225783

   This PR remind me. I also notice some optimization.
   
   We can do a `EPIC list tasks about optimizer` to collect those optimization like #5546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org