You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/18 13:37:50 UTC

[GitHub] [arrow] Dandandan opened a new pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Dandandan opened a new pull request #8961:
URL: https://github.com/apache/arrow/pull/8961


   This PR uses the `num_rows` statistics to implement a common optimization to use the smallest table for the build phase.
   This is a good heuristic, as there are less items to insert to the hash table and the size of tables can be very imbalanced.
   
   Some notes:
   
   * The optimization works on the `LogicalPlan` by swapping left and right, the join type and the key order. This seems currently the easiest place to add it, as there is no cost based optimizer and/or optimizers on the physical plan yet. The optimization rule assumes that the left part of the join will be used for the build phase and the right part for the probe phase.
   * It requires the number of rows to be exactly known, so it will not work whenever there is a transformation changing the number of rows, except for `limit`. The idea here is that in other cases, it is very hard to estimate the number of resulting rows.
   * The impact currently is negative, as the hash join implementation seems to currently be slower when the right side of the join is bigger. That seems to strange and unexpected, but it seems better to disable this optimization until that is "fixed".
   
    FYI @andygrove @jorgecarleitao 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545974057



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Some databases use the `STRAIGHT_JOIN` modifier to force joins to happen in the user-specified order. This is from Impala docs:
   
   > If statistics are not available for all the tables in the join query, or if Impala chooses a join order that is not the most efficient, you can override the automatic join order optimization by specifying the STRAIGHT_JOIN keyword immediately after the SELECT and any DISTINCT or ALL keywords. In this case, Impala uses the order the tables appear in the query to guide how the joins are processed. 
   
   I think we can merge this PR as is and continue this discussion. Spark's AQE approach would mean that we have the statistics, but only if we load both sides into memory first (or scan them first for row counts) which would possibly defeat the point of this optimization. It would also mean that the next operator in the query plan wouldn't be able to start streaming until the join has completed? This is a tricky area.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748437288


   I checked merging the other PR https://github.com/apache/arrow/pull/8965 which improves the join implementation.
   
   Besides being much fastest regardless of this PR, reordering gives a further ~15% reduction in time when reordering the following query (6001214 vs 1499999 rows)
   
   ```
   select
                   l_shipmode,
                   sum(case
                       when o_orderpriority = '1-URGENT'
                           or o_orderpriority = '2-HIGH'
                           then 1
                       else 0
                   end) as high_line_count,
                   sum(case
                       when o_orderpriority <> '1-URGENT'
                           and o_orderpriority <> '2-HIGH'
                           then 1
                       else 0
                   end) as low_line_count
               from
                   lineitem
               join
                   orders
               on
                   l_orderkey = o_orderkey
               group by
                   l_shipmode
               order by
                   l_shipmode;"
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545873082



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {

Review comment:
       Not sure if this is relevant, where this is used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545974057



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Some databases use the `STRAIGHT_JOIN` to force joins to happen in the user-specified order. This is from Impala docs:
   
   ```
   If statistics are not available for all the tables in the join query, or if Impala chooses a join order that is not the most efficient, you can override the automatic join order optimization by specifying the STRAIGHT_JOIN keyword immediately after the SELECT and any DISTINCT or ALL keywords. In this case, Impala uses the order the tables appear in the query to guide how the joins are processed. 
   ```
   
   I think we can merge this PR as is and continue this discussion. Spark's AQE approach would mean that we have the statistics, but only if we load both sides into memory first (or scan them first for row counts) which would possibly defeat the point of this optimization. It would also mean that the next operator in the query plan wouldn't be able to start streaming until the join has completed? This is a tricky area.

##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Some databases use the `STRAIGHT_JOIN` modifier to force joins to happen in the user-specified order. This is from Impala docs:
   
   ```
   If statistics are not available for all the tables in the join query, or if Impala chooses a join order that is not the most efficient, you can override the automatic join order optimization by specifying the STRAIGHT_JOIN keyword immediately after the SELECT and any DISTINCT or ALL keywords. In this case, Impala uses the order the tables appear in the query to guide how the joins are processed. 
   ```
   
   I think we can merge this PR as is and continue this discussion. Spark's AQE approach would mean that we have the statistics, but only if we load both sides into memory first (or scan them first for row counts) which would possibly defeat the point of this optimization. It would also mean that the next operator in the query plan wouldn't be able to start streaming until the join has completed? This is a tricky area.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748218797


   I do think that we should start looking at optimizations on the physical plan and eventually move this optimization there. I also do think that an adaptive execution approach makes sense, especially in a distributed context. I think it might not work so well for the current single node / in-process execution approach though.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748151628


   Thanks @Dandandan this is looking great :rocket: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545960387



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Indeed I think that could be very beneficial but estimating it before executing can be really hard / impossible I think?
   
   Also, if using the left as build side wrong, at this moment, the order could be changed by the user by changing the query itself.
   
   I think ideally you should be able to know more about the table size when the query is executing (a la Spark 3 adaptive query execution) so you don't do the wrong thing. BigQuery also has a nice strategy / explanation for this https://cloud.google.com/bigquery/query-plan-explanation This probably requires quite a bit of changes on the execution / planning side, but this would bring much more available statistics to each step during execution to be able to change optimize the plan further.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748492267


   I wrote some details of the PRs for a planned blog post.
   
   https://docs.google.com/document/d/1Urxm34rl8DZ5D0vyhlrrBoZK6IHW7WFRN3hsaTfPujg/edit?usp=drivesdk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748106185


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=h1) Report
   > Merging [#8961](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=desc) (959ce86) into [master](https://codecov.io/gh/apache/arrow/commit/a054c78813c68b99abc0df9db87ca1e530d324d7?el=desc) (a054c78) will **decrease** coverage by `0.03%`.
   > The diff coverage is `61.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8961/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8961      +/-   ##
   ==========================================
   - Coverage   83.20%   83.17%   -0.04%     
   ==========================================
     Files         199      200       +1     
     Lines       48857    48946      +89     
   ==========================================
   + Hits        40651    40709      +58     
   - Misses       8206     8237      +31     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `88.12% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfdXRpbHMucnM=) | `97.10% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BsYW5uZXIucnM=) | `80.45% <ø> (ø)` | |
   | [rust/datafusion/src/sql/parser.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGFyc2VyLnJz) | `86.87% <ø> (ø)` | |
   | [...datafusion/src/optimizer/hash\_build\_probe\_order.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvaGFzaF9idWlsZF9wcm9iZV9vcmRlci5ycw==) | `59.09% <59.09%> (ø)` | |
   | [rust/datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `90.00% <100.00%> (+0.01%)` | :arrow_up: |
   | [...t/datafusion/src/optimizer/projection\_push\_down.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvcHJvamVjdGlvbl9wdXNoX2Rvd24ucnM=) | `97.70% <100.00%> (ø)` | |
   | [rust/datafusion/src/optimizer/utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvdXRpbHMucnM=) | `61.75% <100.00%> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.77% <100.00%> (ø)` | |
   | [rust/datafusion/src/sql/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGxhbm5lci5ycw==) | `84.79% <100.00%> (ø)` | |
   | ... and [3 more](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=footer). Last update [a054c78...959ce86](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748437288


   I checked merging this https://github.com/apache/arrow/pull/8965 
   
   Besides being ~20-50x faster regardless of this PR, reordering gives a further ~15% reduction in time when reordering the following query (6001214 vs 1499999 rows)
   
   ```
   select
                   l_shipmode,
                   sum(case
                       when o_orderpriority = '1-URGENT'
                           or o_orderpriority = '2-HIGH'
                           then 1
                       else 0
                   end) as high_line_count,
                   sum(case
                       when o_orderpriority <> '1-URGENT'
                           and o_orderpriority <> '2-HIGH'
                           then 1
                       else 0
                   end) as low_line_count
               from
                   lineitem
               join
                   orders
               on
                   l_orderkey = o_orderkey
               group by
                   l_shipmode
               order by
                   l_shipmode;"
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545960387



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Indeed I think that could be very beneficial but estimating it before executing can be really hard / impossible.
   
   Also, if using the left as build side wrong, at this moment, the order could be changed by the user by changing the query itself.
   
   I think ideally you should be able to know more about the table size when the query is executing (a la Spark 3 adaptive query execution) so you don't do the wrong thing. BigQuery also has a nice strategy / explanation for this https://cloud.google.com/bigquery/query-plan-explanation This probably requires quite a bit of changes on the execution / planning side, but this would bring much more available statistics to each step during execution to be able to change optimize the plan further.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545960387



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Indeed I think that could be very beneficial but estimating it before executing might be really hard / impossible?
   
   Also, if using the left as build side wrong, at this moment, the order could be changed by the user by changing the query itself.
   
   I think ideally you should be able to know more about the table size when the query is executing (a la Spark 3 adaptive query execution) so you don't do the wrong thing. BigQuery also has a nice strategy / explanation for this https://cloud.google.com/bigquery/query-plan-explanation This probably requires quite a bit of changes on the execution / planning side, but this would bring much more available statistics to each step during execution to be able to change optimize the plan further.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545959888



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {

Review comment:
       This looks good. This is used for projections without an input, such as `SELECT 1`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748646647


   I merged with master locally and tested it and see a speedup. I will merge when CI is green. Thanks @Dandandan 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748437288


   I checked merging the other PR https://github.com/apache/arrow/pull/8965 which improves the join implementation.
   
   Besides being much faster regardless of this PR, reordering gives a further ~15% reduction in time when reordering the following query (6001214 vs 1499999 rows)
   
   ```
   select
                   l_shipmode,
                   sum(case
                       when o_orderpriority = '1-URGENT'
                           or o_orderpriority = '2-HIGH'
                           then 1
                       else 0
                   end) as high_line_count,
                   sum(case
                       when o_orderpriority <> '1-URGENT'
                           and o_orderpriority <> '2-HIGH'
                           then 1
                       else 0
                   end) as low_line_count
               from
                   lineitem
               join
                   orders
               on
                   l_orderkey = o_orderkey
               group by
                   l_shipmode
               order by
                   l_shipmode;"
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r546039700



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {
+                        left: right.clone(),
+                        right: left.clone(),
+                        on: on
+                            .iter()
+                            .map(|(l, r)| (r.to_string(), l.to_string()))

Review comment:
       Filed here https://issues.apache.org/jira/browse/ARROW-10965




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748437288


   I checked merging the other PR https://github.com/apache/arrow/pull/8965 which improves the join implementation.
   
   Besides being ~20-50x faster regardless of this PR, reordering gives a further ~15% reduction in time when reordering the following query (6001214 vs 1499999 rows)
   
   ```
   select
                   l_shipmode,
                   sum(case
                       when o_orderpriority = '1-URGENT'
                           or o_orderpriority = '2-HIGH'
                           then 1
                       else 0
                   end) as high_line_count,
                   sum(case
                       when o_orderpriority <> '1-URGENT'
                           and o_orderpriority <> '2-HIGH'
                           then 1
                       else 0
                   end) as low_line_count
               from
                   lineitem
               join
                   orders
               on
                   l_orderkey = o_orderkey
               group by
                   l_shipmode
               order by
                   l_shipmode;"
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748645892


   @andygrove updated & enabled the optimization now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748106185


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=h1) Report
   > Merging [#8961](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=desc) (7b7e1b8) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **decrease** coverage by `0.02%`.
   > The diff coverage is `53.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8961/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8961      +/-   ##
   ==========================================
   - Coverage   83.25%   83.23%   -0.03%     
   ==========================================
     Files         196      197       +1     
     Lines       48116    48201      +85     
   ==========================================
   + Hits        40059    40119      +60     
   - Misses       8057     8082      +25     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `87.93% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BsYW5uZXIucnM=) | `80.54% <ø> (ø)` | |
   | [rust/datafusion/src/sql/parser.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGFyc2VyLnJz) | `86.87% <ø> (ø)` | |
   | [...datafusion/src/optimizer/hash\_build\_probe\_order.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvaGFzaF9idWlsZF9wcm9iZV9vcmRlci5ycw==) | `52.17% <52.17%> (ø)` | |
   | [rust/datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `89.97% <100.00%> (+0.01%)` | :arrow_up: |
   | [rust/datafusion/src/sql/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGxhbm5lci5ycw==) | `81.72% <100.00%> (ø)` | |
   | [rust/parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9lbmNvZGluZ3MvZW5jb2RpbmcucnM=) | `95.24% <0.00%> (-0.20%)` | :arrow_down: |
   | [rust/parquet/src/file/statistics.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9maWxlL3N0YXRpc3RpY3MucnM=) | `93.80% <0.00%> (ø)` | |
   | [rust/parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9hcnJvd19yZWFkZXIucnM=) | `90.69% <0.00%> (+0.10%)` | :arrow_up: |
   | ... and [6 more](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=footer). Last update [d65ba4e...7b7e1b8](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545946880



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {

Review comment:
       We should add a unit test for this so that we know the answer to that question, I think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545960387



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Indeed I think that could be very beneficial but estimating it before executing might be really hard / impossible?
   
   Also, if using the left as build side wrong, at this moment, the order could be changed by the user by changing the query itself, which you lose by having a heuristic that can be wrong (or you have to provide some other mechanism , e.g. providing a query hint)
   
   I think ideally you should be able to know more about the table size when the query is executing (a la Spark 3 adaptive query execution) so you don't do the wrong thing. BigQuery also has a nice strategy / explanation for this https://cloud.google.com/bigquery/query-plan-explanation This probably requires quite a bit of changes on the execution / planning side, but this would bring much more available statistics to each step during execution to be able to change optimize the plan further.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748362891


   Also related to https://github.com/apache/arrow/pull/8965 which stops generating/using an index for the probe side.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545926999



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {

Review comment:
       :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748106185


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=h1) Report
   > Merging [#8961](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=desc) (1430e0d) into [master](https://codecov.io/gh/apache/arrow/commit/a054c78813c68b99abc0df9db87ca1e530d324d7?el=desc) (a054c78) will **decrease** coverage by `0.03%`.
   > The diff coverage is `61.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8961/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8961      +/-   ##
   ==========================================
   - Coverage   83.20%   83.16%   -0.04%     
   ==========================================
     Files         199      200       +1     
     Lines       48857    48946      +89     
   ==========================================
   + Hits        40651    40708      +57     
   - Misses       8206     8238      +32     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `88.12% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfdXRpbHMucnM=) | `97.10% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BsYW5uZXIucnM=) | `80.45% <ø> (ø)` | |
   | [rust/datafusion/src/sql/parser.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGFyc2VyLnJz) | `86.87% <ø> (ø)` | |
   | [...datafusion/src/optimizer/hash\_build\_probe\_order.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvaGFzaF9idWlsZF9wcm9iZV9vcmRlci5ycw==) | `59.09% <59.09%> (ø)` | |
   | [rust/datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `90.00% <100.00%> (+0.01%)` | :arrow_up: |
   | [...t/datafusion/src/optimizer/projection\_push\_down.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvcHJvamVjdGlvbl9wdXNoX2Rvd24ucnM=) | `97.70% <100.00%> (ø)` | |
   | [rust/datafusion/src/optimizer/utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvdXRpbHMucnM=) | `61.75% <100.00%> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.77% <100.00%> (ø)` | |
   | [rust/datafusion/src/sql/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGxhbm5lci5ycw==) | `84.79% <100.00%> (ø)` | |
   | ... and [4 more](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=footer). Last update [a054c78...1430e0d](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545903205



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {

Review comment:
       I'm not sure if it is relevant, but when I have done this in other projects, I have wrapped the swapped join in a projection to preserve the column ordering of the output. This would be less surprising to a user.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8961:
URL: https://github.com/apache/arrow/pull/8961


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545903205



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {

Review comment:
       I'm not sure if it is relevant, but when I have done this in other projects, I have wrapped the swapped join in a projection to preserve the column ordering of the output. This would be less surprising to a user if for some reason there is no final projection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545977453



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       I filed https://issues.apache.org/jira/browse/ARROW-10964 for "Optimize nested joins" and referenced this discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748106185


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=h1) Report
   > Merging [#8961](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=desc) (da7a858) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **decrease** coverage by `0.08%`.
   > The diff coverage is `33.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8961/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8961      +/-   ##
   ==========================================
   - Coverage   83.25%   83.17%   -0.09%     
   ==========================================
     Files         196      197       +1     
     Lines       48116    48276     +160     
   ==========================================
   + Hits        40059    40153      +94     
   - Misses       8057     8123      +66     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `89.95% <ø> (ø)` | |
   | [rust/datafusion/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `87.93% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfdXRpbHMucnM=) | `97.10% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BsYW5uZXIucnM=) | `80.54% <ø> (ø)` | |
   | [rust/datafusion/src/sql/parser.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGFyc2VyLnJz) | `86.87% <ø> (ø)` | |
   | [...datafusion/src/optimizer/hash\_build\_probe\_order.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvaGFzaF9idWlsZF9wcm9iZV9vcmRlci5ycw==) | `29.76% <29.76%> (ø)` | |
   | [...t/datafusion/src/optimizer/projection\_push\_down.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvcHJvamVjdGlvbl9wdXNoX2Rvd24ucnM=) | `97.68% <100.00%> (ø)` | |
   | [rust/datafusion/src/optimizer/utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvdXRpbHMucnM=) | `61.75% <100.00%> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.09% <100.00%> (ø)` | |
   | [rust/datafusion/src/sql/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGxhbm5lci5ycw==) | `81.72% <100.00%> (ø)` | |
   | ... and [12 more](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=footer). Last update [d65ba4e...da7a858](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545899919



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {

Review comment:
       ~We should also cover `Join` here too? Since we only support inner joins for now, the output row count will at most be the number of rows of the smaller input.~
   
   I should learn to not do these reviews before my first cup of coffee. This is complete nonsense of course.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545902311



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {
+                        left: right.clone(),
+                        right: left.clone(),
+                        on: on
+                            .iter()
+                            .map(|(l, r)| (r.to_string(), l.to_string()))

Review comment:
       Makes sense, was surprised by it that I had to. Currently it fails when you change the order (e.g. in a query already) without changing the key order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748437288


   I checked merging the other PR https://github.com/apache/arrow/pull/8965 which improves the join implementation.
   
   Besides being much faster regardless of this PR, reordering gives a further ~15% reduction in time when reordering the following query (6001214 left vs 1499999 rows on the right)
   
   ```
   select
                   l_shipmode,
                   sum(case
                       when o_orderpriority = '1-URGENT'
                           or o_orderpriority = '2-HIGH'
                           then 1
                       else 0
                   end) as high_line_count,
                   sum(case
                       when o_orderpriority <> '1-URGENT'
                           and o_orderpriority <> '2-HIGH'
                           then 1
                       else 0
                   end) as low_line_count
               from
                   lineitem
               join
                   orders
               on
                   l_orderkey = o_orderkey
               group by
                   l_shipmode
               order by
                   l_shipmode;"
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545920907



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {

Review comment:
       Isn't the order explicit in the `schema` (which is the same) or will it be changed based on swapping left and right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748639459


   @Dandandan This needs rebasing - I tried merging into master locally before merging this and got some compilation errors.
   
   ```
   error[E0050]: method `scan` has 3 parameters but the declaration in trait `datasource::datasource::TableProvider::scan` has 4
      --> datafusion/src/optimizer/hash_build_probe_order.rs:179:13
       |
   179 | /             &self,
   180 | |             _projection: &Option<Vec<usize>>,
   181 | |             _batch_size: usize,
       | |______________________________^ expected 4 parameters, found 3
       | 
      ::: datafusion/src/datasource/datasource.rs:66:9
       |
   66  | /         &self,
   67  | |         projection: &Option<Vec<usize>>,
   68  | |         batch_size: usize,
   69  | |         filters: &[Expr],
       | |________________________- trait requires 4 parameters
   
   error: cannot construct `plan::LogicalPlan` with struct literal syntax due to inaccessible fields
      --> datafusion/src/optimizer/hash_build_probe_order.rs:204:23
       |
   204 |         let lp_left = LogicalPlan::TableScan {
       |                       ^^^^^^^^^^^^^^^^^^^^^^
   
   error: cannot construct `plan::LogicalPlan` with struct literal syntax due to inaccessible fields
      --> datafusion/src/optimizer/hash_build_probe_order.rs:211:24
       |
   211 |         let lp_right = LogicalPlan::TableScan {
       |                        ^^^^^^^^^^^^^^^^^^^^^^
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748094818


   https://issues.apache.org/jira/browse/ARROW-10885


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] seddonm1 commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
seddonm1 commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748360762


   This is awesome work @Dandandan 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748640314


   @andygrove thanks, will do. Will also enable it now that #8965 is merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545948711



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       I have been thinking about what we can do to estimate the number of rows coming out of joins so that we can extend this optimization to nested joins. We can't do anything accurate with the current statistics in this case but I feel that we should try and do something rather than just pick the left side as the build side.
   
   One idea is to assume that all joins produce a cartesian product (left row count * right row count). This would at least help in the case where two small tables are joined, and then joined with a huge table, or the other way around.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545902311



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {
+                        left: right.clone(),
+                        right: left.clone(),
+                        on: on
+                            .iter()
+                            .map(|(l, r)| (r.to_string(), l.to_string()))

Review comment:
       Makes sense, was surprised by it that I had to. Currently it fails when you change the order (e.g. in a query already)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io edited a comment on pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#issuecomment-748106185


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=h1) Report
   > Merging [#8961](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=desc) (d9ffa00) into [master](https://codecov.io/gh/apache/arrow/commit/d65ba4ec5daeb93ca5031f883d08d559b68753b2?el=desc) (d65ba4e) will **decrease** coverage by `0.08%`.
   > The diff coverage is `32.96%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8961/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8961      +/-   ##
   ==========================================
   - Coverage   83.25%   83.16%   -0.09%     
   ==========================================
     Files         196      197       +1     
     Lines       48116    48278     +162     
   ==========================================
   + Hits        40059    40152      +93     
   - Misses       8057     8126      +69     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `89.95% <ø> (ø)` | |
   | [rust/datafusion/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `87.93% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfdXRpbHMucnM=) | `97.10% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3BsYW5uZXIucnM=) | `80.54% <ø> (ø)` | |
   | [rust/datafusion/src/sql/parser.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGFyc2VyLnJz) | `86.87% <ø> (ø)` | |
   | [...datafusion/src/optimizer/hash\_build\_probe\_order.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvaGFzaF9idWlsZF9wcm9iZV9vcmRlci5ycw==) | `29.06% <29.06%> (ø)` | |
   | [...t/datafusion/src/optimizer/projection\_push\_down.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvcHJvamVjdGlvbl9wdXNoX2Rvd24ucnM=) | `97.68% <100.00%> (ø)` | |
   | [rust/datafusion/src/optimizer/utils.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9vcHRpbWl6ZXIvdXRpbHMucnM=) | `61.75% <100.00%> (ø)` | |
   | [rust/datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2hhc2hfam9pbi5ycw==) | `92.09% <100.00%> (ø)` | |
   | [rust/datafusion/src/sql/planner.rs](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9zcWwvcGxhbm5lci5ycw==) | `81.72% <100.00%> (ø)` | |
   | ... and [13 more](https://codecov.io/gh/apache/arrow/pull/8961/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=footer). Last update [d65ba4e...d9ffa00](https://codecov.io/gh/apache/arrow/pull/8961?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545899919



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {

Review comment:
       We should also cover `Join` here too? Since we only support inner joins for now, the output row count will at most be the number of rows of the smaller input.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545901341



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                if should_swap_join_order(left, right) {
+                    // Swap left and right, change join type and (equi-)join key order
+                    Ok(LogicalPlan::Join {
+                        left: right.clone(),
+                        right: left.clone(),
+                        on: on
+                            .iter()
+                            .map(|(l, r)| (r.to_string(), l.to_string()))

Review comment:
       in theory, this is unnecessary since there are no restrictions in SQL on order of join conditions. However, it is possible we do make some assumptions so if you ean into that it would be good to file an issue for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8961: ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8961:
URL: https://github.com/apache/arrow/pull/8961#discussion_r545960387



##########
File path: rust/datafusion/src/optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {

Review comment:
       Indeed I think that could be very beneficial but estimating it before executing might be really hard / impossible?
   
   Also, if using the left as build side wrong, at this moment, the order could be changed by the user by changing the query itself, which you lose by having a heuristic that can be wrong.
   
   I think ideally you should be able to know more about the table size when the query is executing (a la Spark 3 adaptive query execution) so you don't do the wrong thing. BigQuery also has a nice strategy / explanation for this https://cloud.google.com/bigquery/query-plan-explanation This probably requires quite a bit of changes on the execution / planning side, but this would bring much more available statistics to each step during execution to be able to change optimize the plan further.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org