You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/21 17:13:17 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4219: [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics

alamb commented on code in PR #4219:
URL: https://github.com/apache/arrow-datafusion/pull/4219#discussion_r1028295969


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1228,6 +1228,11 @@ pub struct SessionConfig {
     pub collect_statistics: bool,
     /// Should DataFusion optimizer run a top down process to reorder the join keys
     pub top_down_join_key_reordering: bool,
+    /// Should DataFusion optimizer prefer HashJoin over SortMergeJoin.
+    /// HashJoin can work more efficently than SortMergeJoin but consumes more memory.
+    pub prefer_hash_join: bool,

Review Comment:
   What would you think about moving these new settings into ConfigOptions (where they are visible via `SHOW` and automatically documented)?



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make

Review Comment:
   I like this design with `PartitionMode::Auto`



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()

Review Comment:
   is this TODO worth tracking with a ticket?



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.

Review Comment:
   the prestosql approach makes sense to me



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()
+    if let Some(size) = plan.statistics().total_byte_size {
+        size != 0 && size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count != 0 && row_count < collection_size_threshold
+    } else {
+        false
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => true,
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Full => JoinType::Full,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+        JoinType::LeftSemi => JoinType::RightSemi,
+        JoinType::RightSemi => JoinType::LeftSemi,
+        JoinType::LeftAnti => JoinType::RightAnti,
+        JoinType::RightAnti => JoinType::LeftAnti,
+    }
+}
+
+fn swap_hash_join(
+    hash_join: &HashJoinExec,
+    partition_mode: PartitionMode,
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let new_join = HashJoinExec::try_new(
+        Arc::clone(right),
+        Arc::clone(left),
+        hash_join
+            .on()
+            .iter()
+            .map(|(l, r)| (r.clone(), l.clone()))
+            .collect(),
+        swap_join_filter(hash_join.filter()),
+        &swap_join_type(*hash_join.join_type()),
+        partition_mode,
+        hash_join.null_equals_null(),
+    )?;
+    if matches!(
+        hash_join.join_type(),
+        JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    ) {
+        Ok(Arc::new(new_join))
+    } else {
+        // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+        let proj = ProjectionExec::try_new(
+            swap_reverting_projection(&left.schema(), &right.schema()),
+            Arc::new(new_join),
+        )?;
+        Ok(Arc::new(proj))
+    }
+}
+
+/// When the order of the join is changed by the optimizer,
+/// the columns in the output should not be impacted.
+/// This helper creates the expressions that will allow to swap
+/// back the values from the original left as first columns and
+/// those on the right next
+fn swap_reverting_projection(
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
+    let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+    let right_len = right_cols.len();
+    let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), right_len + i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+
+    left_cols.chain(right_cols).collect()
+}
+
+/// Swaps join sides for filter column indices and produces new JoinFilter
+fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
+    match filter {
+        Some(filter) => {
+            let column_indices = filter
+                .column_indices()
+                .iter()
+                .map(|idx| {
+                    let side = if matches!(idx.side, JoinSide::Left) {
+                        JoinSide::Right
+                    } else {
+                        JoinSide::Left
+                    };
+                    ColumnIndex {
+                        index: idx.index,
+                        side,
+                    }
+                })
+                .collect();
+
+            Some(JoinFilter::new(
+                filter.expression().clone(),
+                column_indices,
+                filter.schema().clone(),
+            ))
+        }
+        None => None,
+    }
+}
+
+impl PhysicalOptimizerRule for JoinSelection {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        session_config: &SessionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let collect_left_threshold = session_config.hash_join_collect_left_threshold;
+        plan.transform_up(&|plan| {
+            if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+                match hash_join.partition_mode() {
+                    PartitionMode::Auto => {
+                        try_collect_left(hash_join, Some(collect_left_threshold))
+                            .unwrap()
+                            .or_else(|| Some(partitioned_hash_join(hash_join).unwrap()))

Review Comment:
   Why does this code (and below) use `unwrap`? I think they should return an error rather than `panic`ing



##########
datafusion/core/src/execution/context.rs:
##########
@@ -1228,6 +1228,11 @@ pub struct SessionConfig {
     pub collect_statistics: bool,
     /// Should DataFusion optimizer run a top down process to reorder the join keys
     pub top_down_join_key_reordering: bool,
+    /// Should DataFusion optimizer prefer HashJoin over SortMergeJoin.
+    /// HashJoin can work more efficently than SortMergeJoin but consumes more memory.
+    pub prefer_hash_join: bool,
+    /// The maximum estimated size in bytes for the left input a hash join will be collected into one partition
+    pub hash_join_collect_left_threshold: usize,

Review Comment:
   I recommend calling this seetting something that doesn't have 'left' and 'right' as that can get confusing.
   
   How about `hash_join_single_partition_threshold`?



##########
datafusion/core/src/physical_plan/joins/mod.rs:
##########
@@ -29,6 +29,9 @@ pub enum PartitionMode {
     Partitioned,
     /// Left side will collected into one partition
     CollectLeft,
+    /// When set to Auto, DataFusion optimizer will decide which PartitionMode mode(Partitioned/CollectLeft) is optimal based on statistics.
+    /// It will also consider swapping the left and right inputs for the Join

Review Comment:
   The optimizer also will swap the inputs for Partitioned and CollectLeft mode too, right? As written, this comment could be confusing and imply that inputs will only be swapped if the mode is set to Auto



##########
datafusion/core/src/execution/context.rs:
##########
@@ -1228,6 +1228,11 @@ pub struct SessionConfig {
     pub collect_statistics: bool,
     /// Should DataFusion optimizer run a top down process to reorder the join keys
     pub top_down_join_key_reordering: bool,
+    /// Should DataFusion optimizer prefer HashJoin over SortMergeJoin.
+    /// HashJoin can work more efficently than SortMergeJoin but consumes more memory.

Review Comment:
   ```suggestion
       /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory. Defaults to true
   ```



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data

Review Comment:
   this comment seems incorrect for this module



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()
+    if let Some(size) = plan.statistics().total_byte_size {
+        size != 0 && size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count != 0 && row_count < collection_size_threshold
+    } else {
+        false
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => true,

Review Comment:
   I am surprised we can swap the inputs to the various outer and semi join types without changing the query's output 🤔 



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()
+    if let Some(size) = plan.statistics().total_byte_size {
+        size != 0 && size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count != 0 && row_count < collection_size_threshold
+    } else {
+        false
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => true,
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Full => JoinType::Full,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+        JoinType::LeftSemi => JoinType::RightSemi,
+        JoinType::RightSemi => JoinType::LeftSemi,
+        JoinType::LeftAnti => JoinType::RightAnti,
+        JoinType::RightAnti => JoinType::LeftAnti,
+    }
+}
+
+fn swap_hash_join(
+    hash_join: &HashJoinExec,
+    partition_mode: PartitionMode,
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let new_join = HashJoinExec::try_new(
+        Arc::clone(right),
+        Arc::clone(left),
+        hash_join
+            .on()
+            .iter()
+            .map(|(l, r)| (r.clone(), l.clone()))
+            .collect(),
+        swap_join_filter(hash_join.filter()),
+        &swap_join_type(*hash_join.join_type()),
+        partition_mode,
+        hash_join.null_equals_null(),
+    )?;
+    if matches!(
+        hash_join.join_type(),
+        JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    ) {
+        Ok(Arc::new(new_join))
+    } else {
+        // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+        let proj = ProjectionExec::try_new(
+            swap_reverting_projection(&left.schema(), &right.schema()),
+            Arc::new(new_join),
+        )?;
+        Ok(Arc::new(proj))
+    }
+}
+
+/// When the order of the join is changed by the optimizer,
+/// the columns in the output should not be impacted.
+/// This helper creates the expressions that will allow to swap
+/// back the values from the original left as first columns and
+/// those on the right next
+fn swap_reverting_projection(
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
+    let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+    let right_len = right_cols.len();
+    let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), right_len + i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+
+    left_cols.chain(right_cols).collect()
+}
+
+/// Swaps join sides for filter column indices and produces new JoinFilter
+fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
+    match filter {
+        Some(filter) => {

Review Comment:
   Instead of `match filter` I think you can use `map` like:
   
   ```rust
   filter.map(|filter| {
     let column_indicies = ...
   ```
   
   Not critical, I just figured I woudl point it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org