You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/24 11:36:59 UTC

[arrow-datafusion] branch master updated: [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics, option for SortMergeJoin (#4219)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 561be4fda [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics, option for SortMergeJoin (#4219)
561be4fda is described below

commit 561be4fda2ed3abac038cf422cb7b03d443748de
Author: mingmwang <mi...@ebay.com>
AuthorDate: Thu Nov 24 19:36:53 2022 +0800

    [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics, option for SortMergeJoin (#4219)
    
    * [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the available statistics
    
    * Fix HashJoin CollectLeft bug, refine UT to cover 'enable'/'disable' repartition_joins
    
    * add comments
    
    * ignore 0 stats
    
    * Resolve review comments, add intg UT for SMJ
    
    * fix conflicts
    
    * tiny fix to doc
    
    * refine swap_join_filter()
    
    * update configs.md
    
    * fix configs.md
---
 datafusion/core/src/config.rs                      |   27 +
 datafusion/core/src/execution/context.rs           |    9 +-
 .../core/src/physical_optimizer/enforcement.rs     |   13 +-
 .../physical_optimizer/hash_build_probe_order.rs   |  562 --------
 .../core/src/physical_optimizer/join_selection.rs  |  979 +++++++++++++
 datafusion/core/src/physical_optimizer/mod.rs      |    2 +-
 .../core/src/physical_plan/joins/hash_join.rs      |   13 +
 datafusion/core/src/physical_plan/joins/mod.rs     |    3 +
 .../src/physical_plan/joins/sort_merge_join.rs     |   15 +-
 datafusion/core/src/physical_plan/planner.rs       |   39 +-
 datafusion/core/tests/sql/information_schema.rs    |   39 +-
 datafusion/core/tests/sql/joins.rs                 | 1443 +++++++++++---------
 datafusion/core/tests/sql/mod.rs                   |  136 +-
 datafusion/core/tests/sql/references.rs            |    2 +-
 datafusion/core/tests/sql/wildcard.rs              |    4 +-
 datafusion/optimizer/README.md                     |    2 +-
 docs/source/user-guide/configs.md                  |   37 +-
 17 files changed, 2100 insertions(+), 1225 deletions(-)

diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 3b00f4ab1..620634f25 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -74,6 +74,17 @@ pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";
 /// Type of `TableProvider` to use when loading `default` schema
 pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";
 
+/// Configuration option "datafusion.optimizer.top_down_join_key_reordering"
+pub const OPT_TOP_DOWN_JOIN_KEY_REORDERING: &str =
+    "datafusion.optimizer.top_down_join_key_reordering";
+
+/// Configuration option "datafusion.optimizer.prefer_hash_join"
+pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
+
+/// Configuration option "atafusion.optimizer.hash_join_single_partition_threshold"
+pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
+    "datafusion.optimizer.hash_join_single_partition_threshold";
+
 /// Definition of a configuration option
 pub struct ConfigDefinition {
     /// key used to identifier this configuration option
@@ -266,6 +277,22 @@ impl BuiltInConfigs {
                 "Type of `TableProvider` to use when loading `default` schema. Defaults to None",
                 None,
             ),
+             ConfigDefinition::new_bool(
+                 OPT_TOP_DOWN_JOIN_KEY_REORDERING,
+                 "When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true",
+                 true,
+             ),
+             ConfigDefinition::new_bool(
+                 OPT_PREFER_HASH_JOIN,
+                 "When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently\
+                 than SortMergeJoin but consumes more memory. Defaults to true",
+                 true,
+             ),
+             ConfigDefinition::new_u64(
+                 OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD,
+                 "The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
+                 1024 * 1024,
+             ),
             ]
         }
     }
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index b793c9c68..736cf648f 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -26,8 +26,8 @@ use crate::{
     logical_expr::{PlanType, ToStringifiedPlan},
     optimizer::optimizer::Optimizer,
     physical_optimizer::{
-        aggregate_statistics::AggregateStatistics,
-        hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
+        aggregate_statistics::AggregateStatistics, join_selection::JoinSelection,
+        optimizer::PhysicalOptimizerRule,
     },
 };
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1166,8 +1166,6 @@ pub struct SessionConfig {
     pub parquet_pruning: bool,
     /// Should DataFusion collect statistics after listing files
     pub collect_statistics: bool,
-    /// Should DataFusion optimizer run a top down process to reorder the join keys
-    pub top_down_join_key_reordering: bool,
     /// Configuration options
     pub config_options: Arc<RwLock<ConfigOptions>>,
     /// Opaque extensions.
@@ -1187,7 +1185,6 @@ impl Default for SessionConfig {
             repartition_windows: true,
             parquet_pruning: true,
             collect_statistics: false,
-            top_down_join_key_reordering: true,
             config_options: Arc::new(RwLock::new(ConfigOptions::new())),
             // Assume no extensions by default.
             extensions: HashMap::with_capacity_and_hasher(
@@ -1508,7 +1505,7 @@ impl SessionState {
 
         let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
             Arc::new(AggregateStatistics::new()),
-            Arc::new(HashBuildProbeOrder::new()),
+            Arc::new(JoinSelection::new()),
         ];
         physical_optimizers.push(Arc::new(BasicEnforcement::new()));
         if config
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index f3f63fcb2..2a8252da8 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -18,6 +18,7 @@
 //! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
 //! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
 //!
+use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
 use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
@@ -72,7 +73,11 @@ impl PhysicalOptimizerRule for BasicEnforcement {
         config: &SessionConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let target_partitions = config.target_partitions;
-        let top_down_join_key_reordering = config.top_down_join_key_reordering;
+        let top_down_join_key_reordering = config
+            .config_options()
+            .read()
+            .get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
+            .unwrap_or_default();
         let new_plan = if top_down_join_key_reordering {
             // Run a top-down process to adjust input key ordering recursively
             let plan_requirements = PlanWithKeyRequirements::new(plan);
@@ -209,6 +214,12 @@ fn adjust_input_keys_ordering(
                     request_key_ordering: vec![None, new_right_request],
                 }))
             }
+            PartitionMode::Auto => {
+                // Can not satisfy, clear the current requirements and generate new empty requirements
+                Ok(Some(PlanWithKeyRequirements::new(
+                    requirements.plan.clone(),
+                )))
+            }
         }
     } else if let Some(CrossJoinExec { left, .. }) =
         plan_any.downcast_ref::<CrossJoinExec>()
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
deleted file mode 100644
index 7c6513499..000000000
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ /dev/null
@@ -1,562 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Utilizing exact statistics from sources to avoid scanning data
-use std::sync::Arc;
-
-use arrow::datatypes::Schema;
-
-use crate::execution::context::SessionConfig;
-use crate::logical_expr::JoinType;
-use crate::physical_plan::expressions::Column;
-use crate::physical_plan::joins::{
-    utils::{ColumnIndex, JoinFilter, JoinSide},
-    CrossJoinExec, HashJoinExec,
-};
-use crate::physical_plan::projection::ProjectionExec;
-use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
-
-use super::optimizer::PhysicalOptimizerRule;
-use super::utils::optimize_children;
-use crate::error::Result;
-
-/// BuildProbeOrder reorders the build and probe phase of
-/// hash joins. This uses the amount of rows that a datasource has.
-/// The rule optimizes the order such that the left (build) side of the join
-/// is the smallest.
-/// If the information is not available, the order stays the same,
-/// so that it could be optimized manually in a query.
-#[derive(Default)]
-pub struct HashBuildProbeOrder {}
-
-impl HashBuildProbeOrder {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
-    // Get the left and right table's total bytes
-    // If both the left and right tables contain total_byte_size statistics,
-    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
-    let (left_size, right_size) = match (
-        left.statistics().total_byte_size,
-        right.statistics().total_byte_size,
-    ) {
-        (Some(l), Some(r)) => (Some(l), Some(r)),
-        _ => (left.statistics().num_rows, right.statistics().num_rows),
-    };
-
-    match (left_size, right_size) {
-        (Some(l), Some(r)) => l > r,
-        _ => false,
-    }
-}
-
-fn supports_swap(join_type: JoinType) -> bool {
-    match join_type {
-        JoinType::Inner
-        | JoinType::Left
-        | JoinType::Right
-        | JoinType::Full
-        | JoinType::LeftSemi
-        | JoinType::RightSemi
-        | JoinType::LeftAnti
-        | JoinType::RightAnti => true,
-    }
-}
-
-fn swap_join_type(join_type: JoinType) -> JoinType {
-    match join_type {
-        JoinType::Inner => JoinType::Inner,
-        JoinType::Full => JoinType::Full,
-        JoinType::Left => JoinType::Right,
-        JoinType::Right => JoinType::Left,
-        JoinType::LeftSemi => JoinType::RightSemi,
-        JoinType::RightSemi => JoinType::LeftSemi,
-        JoinType::LeftAnti => JoinType::RightAnti,
-        JoinType::RightAnti => JoinType::LeftAnti,
-    }
-}
-
-/// When the order of the join is changed by the optimizer,
-/// the columns in the output should not be impacted.
-/// This helper creates the expressions that will allow to swap
-/// back the values from the original left as first columns and
-/// those on the right next
-fn swap_reverting_projection(
-    left_schema: &Schema,
-    right_schema: &Schema,
-) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
-    let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| {
-        (
-            Arc::new(Column::new(f.name(), i)) as Arc<dyn PhysicalExpr>,
-            f.name().to_owned(),
-        )
-    });
-    let right_len = right_cols.len();
-    let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| {
-        (
-            Arc::new(Column::new(f.name(), right_len + i)) as Arc<dyn PhysicalExpr>,
-            f.name().to_owned(),
-        )
-    });
-
-    left_cols.chain(right_cols).collect()
-}
-
-/// Swaps join sides for filter column indices and produces new JoinFilter
-fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
-    match filter {
-        Some(filter) => {
-            let column_indices = filter
-                .column_indices()
-                .iter()
-                .map(|idx| {
-                    let side = if matches!(idx.side, JoinSide::Left) {
-                        JoinSide::Right
-                    } else {
-                        JoinSide::Left
-                    };
-                    ColumnIndex {
-                        index: idx.index,
-                        side,
-                    }
-                })
-                .collect();
-
-            Some(JoinFilter::new(
-                filter.expression().clone(),
-                column_indices,
-                filter.schema().clone(),
-            ))
-        }
-        None => None,
-    }
-}
-
-impl PhysicalOptimizerRule for HashBuildProbeOrder {
-    fn optimize(
-        &self,
-        plan: Arc<dyn ExecutionPlan>,
-        session_config: &SessionConfig,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let plan = optimize_children(self, plan, session_config)?;
-        if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
-            let left = hash_join.left();
-            let right = hash_join.right();
-            if should_swap_join_order(&**left, &**right)
-                && supports_swap(*hash_join.join_type())
-            {
-                let new_join = HashJoinExec::try_new(
-                    Arc::clone(right),
-                    Arc::clone(left),
-                    hash_join
-                        .on()
-                        .iter()
-                        .map(|(l, r)| (r.clone(), l.clone()))
-                        .collect(),
-                    swap_join_filter(hash_join.filter()),
-                    &swap_join_type(*hash_join.join_type()),
-                    *hash_join.partition_mode(),
-                    hash_join.null_equals_null(),
-                )?;
-                if matches!(
-                    hash_join.join_type(),
-                    JoinType::LeftSemi
-                        | JoinType::RightSemi
-                        | JoinType::LeftAnti
-                        | JoinType::RightAnti
-                ) {
-                    return Ok(Arc::new(new_join));
-                }
-
-                let proj = ProjectionExec::try_new(
-                    swap_reverting_projection(&left.schema(), &right.schema()),
-                    Arc::new(new_join),
-                )?;
-                return Ok(Arc::new(proj));
-            }
-        } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
-            let left = cross_join.left();
-            let right = cross_join.right();
-            if should_swap_join_order(&**left, &**right) {
-                let new_join =
-                    CrossJoinExec::try_new(Arc::clone(right), Arc::clone(left))?;
-                let proj = ProjectionExec::try_new(
-                    swap_reverting_projection(&left.schema(), &right.schema()),
-                    Arc::new(new_join),
-                )?;
-                return Ok(Arc::new(proj));
-            }
-        }
-        Ok(plan)
-    }
-
-    fn name(&self) -> &str {
-        "hash_build_probe_order"
-    }
-
-    fn schema_check(&self) -> bool {
-        true
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::{
-        physical_plan::{
-            displayable, joins::PartitionMode, ColumnStatistics, Statistics,
-        },
-        test::exec::StatisticsExec,
-    };
-
-    use super::*;
-    use std::sync::Arc;
-
-    use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion_common::ScalarValue;
-
-    fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
-        let big = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Some(10),
-                total_byte_size: Some(100000),
-                ..Default::default()
-            },
-            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
-        ));
-
-        let small = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Some(100000),
-                total_byte_size: Some(10),
-                ..Default::default()
-            },
-            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
-        ));
-        (big, small)
-    }
-
-    /// Create a column statistics vector for a single column
-    /// that has the given min/max/distinct_count properties.
-    ///
-    /// Given min/max will be mapped to a [`ScalarValue`] if
-    /// they are not `None`.
-    fn create_column_stats(
-        min: Option<u64>,
-        max: Option<u64>,
-        distinct_count: Option<usize>,
-    ) -> Option<Vec<ColumnStatistics>> {
-        Some(vec![ColumnStatistics {
-            distinct_count,
-            min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
-            max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
-            ..Default::default()
-        }])
-    }
-
-    /// Returns three plans with statistics of (min, max, distinct_count)
-    /// * big 100K rows @ (0, 50k, 50k)
-    /// * medium 10K rows @ (1k, 5k, 1k)
-    /// * small 1K rows @ (0, 100k, 1k)
-    fn create_nested_with_min_max() -> (
-        Arc<dyn ExecutionPlan>,
-        Arc<dyn ExecutionPlan>,
-        Arc<dyn ExecutionPlan>,
-    ) {
-        let big = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Some(100_000),
-                column_statistics: create_column_stats(
-                    Some(0),
-                    Some(50_000),
-                    Some(50_000),
-                ),
-                ..Default::default()
-            },
-            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
-        ));
-
-        let medium = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Some(10_000),
-                column_statistics: create_column_stats(
-                    Some(1000),
-                    Some(5000),
-                    Some(1000),
-                ),
-                ..Default::default()
-            },
-            Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]),
-        ));
-
-        let small = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Some(1000),
-                column_statistics: create_column_stats(
-                    Some(0),
-                    Some(100_000),
-                    Some(1000),
-                ),
-                ..Default::default()
-            },
-            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
-        ));
-
-        (big, medium, small)
-    }
-
-    #[tokio::test]
-    async fn test_join_with_swap() {
-        let (big, small) = create_big_and_small();
-
-        let join = HashJoinExec::try_new(
-            Arc::clone(&big),
-            Arc::clone(&small),
-            vec![(
-                Column::new_with_schema("big_col", &big.schema()).unwrap(),
-                Column::new_with_schema("small_col", &small.schema()).unwrap(),
-            )],
-            None,
-            &JoinType::Left,
-            PartitionMode::CollectLeft,
-            &false,
-        )
-        .unwrap();
-
-        let optimized_join = HashBuildProbeOrder::new()
-            .optimize(Arc::new(join), &SessionConfig::new())
-            .unwrap();
-
-        let swapping_projection = optimized_join
-            .as_any()
-            .downcast_ref::<ProjectionExec>()
-            .expect("A proj is required to swap columns back to their original order");
-
-        assert_eq!(swapping_projection.expr().len(), 2);
-        let (col, name) = &swapping_projection.expr()[0];
-        assert_eq!(name, "big_col");
-        assert_col_expr(col, "big_col", 1);
-        let (col, name) = &swapping_projection.expr()[1];
-        assert_eq!(name, "small_col");
-        assert_col_expr(col, "small_col", 0);
-
-        let swapped_join = swapping_projection
-            .input()
-            .as_any()
-            .downcast_ref::<HashJoinExec>()
-            .expect("The type of the plan should not be changed");
-
-        assert_eq!(swapped_join.left().statistics().total_byte_size, Some(10));
-        assert_eq!(
-            swapped_join.right().statistics().total_byte_size,
-            Some(100000)
-        );
-    }
-
-    #[tokio::test]
-    async fn test_join_with_swap_semi() {
-        let join_types = [JoinType::LeftSemi, JoinType::LeftAnti];
-        for join_type in join_types {
-            let (big, small) = create_big_and_small();
-
-            let join = HashJoinExec::try_new(
-                Arc::clone(&big),
-                Arc::clone(&small),
-                vec![(
-                    Column::new_with_schema("big_col", &big.schema()).unwrap(),
-                    Column::new_with_schema("small_col", &small.schema()).unwrap(),
-                )],
-                None,
-                &join_type,
-                PartitionMode::CollectLeft,
-                &false,
-            )
-            .unwrap();
-
-            let original_schema = join.schema();
-
-            let optimized_join = HashBuildProbeOrder::new()
-                .optimize(Arc::new(join), &SessionConfig::new())
-                .unwrap();
-
-            let swapped_join = optimized_join
-                .as_any()
-                .downcast_ref::<HashJoinExec>()
-                .expect(
-                    "A proj is not required to swap columns back to their original order",
-                );
-
-            assert_eq!(swapped_join.schema().fields().len(), 1);
-
-            assert_eq!(swapped_join.left().statistics().total_byte_size, Some(10));
-            assert_eq!(
-                swapped_join.right().statistics().total_byte_size,
-                Some(100000)
-            );
-
-            assert_eq!(original_schema, swapped_join.schema());
-        }
-    }
-
-    /// Compare the input plan with the plan after running the probe order optimizer.
-    macro_rules! assert_optimized {
-        ($EXPECTED_LINES: expr, $PLAN: expr) => {
-            let expected_lines =
-                $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
-
-            let optimized = HashBuildProbeOrder::new()
-                .optimize(Arc::new($PLAN), &SessionConfig::new())
-                .unwrap();
-
-            let plan = displayable(optimized.as_ref()).indent().to_string();
-            let actual_lines = plan.split("\n").collect::<Vec<&str>>();
-
-            assert_eq!(
-                &expected_lines, &actual_lines,
-                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-                expected_lines, actual_lines
-            );
-        };
-    }
-
-    #[tokio::test]
-    async fn test_nested_join_swap() {
-        let (big, medium, small) = create_nested_with_min_max();
-
-        // Form the inner join: big JOIN small
-        let child_join = HashJoinExec::try_new(
-            Arc::clone(&big),
-            Arc::clone(&small),
-            vec![(
-                Column::new_with_schema("big_col", &big.schema()).unwrap(),
-                Column::new_with_schema("small_col", &small.schema()).unwrap(),
-            )],
-            None,
-            &JoinType::Inner,
-            PartitionMode::CollectLeft,
-            &false,
-        )
-        .unwrap();
-        let child_schema = child_join.schema();
-
-        // Form join tree `medium LEFT JOIN (big JOIN small)`
-        let join = HashJoinExec::try_new(
-            Arc::clone(&medium),
-            Arc::new(child_join),
-            vec![(
-                Column::new_with_schema("medium_col", &medium.schema()).unwrap(),
-                Column::new_with_schema("small_col", &child_schema).unwrap(),
-            )],
-            None,
-            &JoinType::Left,
-            PartitionMode::CollectLeft,
-            &false,
-        )
-        .unwrap();
-
-        // Hash join uses the left side to build the hash table, and right side to probe it. We want
-        // to keep left as small as possible, so if we can estimate (with a reasonable margin of error)
-        // that the left side is smaller than the right side, we should swap the sides.
-        //
-        // The first hash join's left is 'small' table (with 1000 rows), and the second hash join's
-        // left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which
-        // has an exact cardinality of 10_000 rows).
-        let expected = [
-            "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]",
-            "  HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
-            "    ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]",
-            "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", index: 0 })]",
-            "        StatisticsExec: col_count=1, row_count=Some(1000)",
-            "        StatisticsExec: col_count=1, row_count=Some(100000)",
-            "    StatisticsExec: col_count=1, row_count=Some(10000)",
-            ""
-        ];
-        assert_optimized!(expected, join);
-    }
-
-    #[tokio::test]
-    async fn test_join_no_swap() {
-        let (big, small) = create_big_and_small();
-
-        let join = HashJoinExec::try_new(
-            Arc::clone(&small),
-            Arc::clone(&big),
-            vec![(
-                Column::new_with_schema("small_col", &small.schema()).unwrap(),
-                Column::new_with_schema("big_col", &big.schema()).unwrap(),
-            )],
-            None,
-            &JoinType::Left,
-            PartitionMode::CollectLeft,
-            &false,
-        )
-        .unwrap();
-
-        let optimized_join = HashBuildProbeOrder::new()
-            .optimize(Arc::new(join), &SessionConfig::new())
-            .unwrap();
-
-        let swapped_join = optimized_join
-            .as_any()
-            .downcast_ref::<HashJoinExec>()
-            .expect("The type of the plan should not be changed");
-
-        assert_eq!(swapped_join.left().statistics().total_byte_size, Some(10));
-        assert_eq!(
-            swapped_join.right().statistics().total_byte_size,
-            Some(100000)
-        );
-    }
-
-    #[tokio::test]
-    async fn test_swap_reverting_projection() {
-        let left_schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, false),
-            Field::new("b", DataType::Int32, false),
-        ]);
-
-        let right_schema = Schema::new(vec![Field::new("c", DataType::Int32, false)]);
-
-        let proj = swap_reverting_projection(&left_schema, &right_schema);
-
-        assert_eq!(proj.len(), 3);
-
-        let (col, name) = &proj[0];
-        assert_eq!(name, "a");
-        assert_col_expr(col, "a", 1);
-
-        let (col, name) = &proj[1];
-        assert_eq!(name, "b");
-        assert_col_expr(col, "b", 2);
-
-        let (col, name) = &proj[2];
-        assert_eq!(name, "c");
-        assert_col_expr(col, "c", 0);
-    }
-
-    fn assert_col_expr(expr: &Arc<dyn PhysicalExpr>, name: &str, index: usize) {
-        let col = expr
-            .as_any()
-            .downcast_ref::<Column>()
-            .expect("Projection items should be Column expression");
-        assert_eq!(col.name(), name);
-        assert_eq!(col.index(), index);
-    }
-}
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
new file mode 100644
index 000000000..7428c5ed6
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -0,0 +1,979 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Select the proper PartitionMode and build side based on the avaliable statistics for hash join.
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::config::OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD;
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()
+    if let Some(size) = plan.statistics().total_byte_size {
+        size != 0 && size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count != 0 && row_count < collection_size_threshold
+    } else {
+        false
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => true,
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Full => JoinType::Full,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+        JoinType::LeftSemi => JoinType::RightSemi,
+        JoinType::RightSemi => JoinType::LeftSemi,
+        JoinType::LeftAnti => JoinType::RightAnti,
+        JoinType::RightAnti => JoinType::LeftAnti,
+    }
+}
+
+fn swap_hash_join(
+    hash_join: &HashJoinExec,
+    partition_mode: PartitionMode,
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let new_join = HashJoinExec::try_new(
+        Arc::clone(right),
+        Arc::clone(left),
+        hash_join
+            .on()
+            .iter()
+            .map(|(l, r)| (r.clone(), l.clone()))
+            .collect(),
+        swap_join_filter(hash_join.filter()),
+        &swap_join_type(*hash_join.join_type()),
+        partition_mode,
+        hash_join.null_equals_null(),
+    )?;
+    if matches!(
+        hash_join.join_type(),
+        JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    ) {
+        Ok(Arc::new(new_join))
+    } else {
+        // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+        let proj = ProjectionExec::try_new(
+            swap_reverting_projection(&left.schema(), &right.schema()),
+            Arc::new(new_join),
+        )?;
+        Ok(Arc::new(proj))
+    }
+}
+
+/// When the order of the join is changed by the optimizer,
+/// the columns in the output should not be impacted.
+/// This helper creates the expressions that will allow to swap
+/// back the values from the original left as first columns and
+/// those on the right next
+fn swap_reverting_projection(
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
+    let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+    let right_len = right_cols.len();
+    let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), right_len + i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+
+    left_cols.chain(right_cols).collect()
+}
+
+/// Swaps join sides for filter column indices and produces new JoinFilter
+fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
+    filter.as_ref().map(|filter| {
+        let column_indices = filter
+            .column_indices()
+            .iter()
+            .map(|idx| {
+                let side = if matches!(idx.side, JoinSide::Left) {
+                    JoinSide::Right
+                } else {
+                    JoinSide::Left
+                };
+                ColumnIndex {
+                    index: idx.index,
+                    side,
+                }
+            })
+            .collect();
+
+        JoinFilter::new(
+            filter.expression().clone(),
+            column_indices,
+            filter.schema().clone(),
+        )
+    })
+}
+
+impl PhysicalOptimizerRule for JoinSelection {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        session_config: &SessionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let collect_left_threshold: usize = session_config
+            .config_options()
+            .read()
+            .get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
+            .unwrap_or_default()
+            .try_into()
+            .unwrap();
+        plan.transform_up(&|plan| {
+            if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+                match hash_join.partition_mode() {
+                    PartitionMode::Auto => {
+                        try_collect_left(hash_join, Some(collect_left_threshold))?
+                            .map_or_else(
+                                || Ok(Some(partitioned_hash_join(hash_join)?)),
+                                |v| Ok(Some(v)),
+                            )
+                    }
+                    PartitionMode::CollectLeft => try_collect_left(hash_join, None)?
+                        .map_or_else(
+                            || Ok(Some(partitioned_hash_join(hash_join)?)),
+                            |v| Ok(Some(v)),
+                        ),
+                    PartitionMode::Partitioned => {
+                        let left = hash_join.left();
+                        let right = hash_join.right();
+                        if should_swap_join_order(&**left, &**right)
+                            && supports_swap(*hash_join.join_type())
+                        {
+                            Ok(Some(swap_hash_join(
+                                hash_join,
+                                PartitionMode::Partitioned,
+                                left,
+                                right,
+                            )?))
+                        } else {
+                            Ok(None)
+                        }
+                    }
+                }
+            } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>()
+            {
+                let left = cross_join.left();
+                let right = cross_join.right();
+                if should_swap_join_order(&**left, &**right) {
+                    let new_join =
+                        CrossJoinExec::try_new(Arc::clone(right), Arc::clone(left))?;
+                    // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+                    let proj = ProjectionExec::try_new(
+                        swap_reverting_projection(&left.schema(), &right.schema()),
+                        Arc::new(new_join),
+                    )?;
+                    Ok(Some(Arc::new(proj)))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })
+    }
+
+    fn name(&self) -> &str {
+        "join_selection"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// Try to create the PartitionMode::CollectLeft HashJoinExec when possible.
+/// The method will first consider the current join type and check whether it is applicable to run CollectLeft mode
+/// and will try to swap the join if the orignal type is unapplicable to run CollectLeft.
+/// When the collect_threshold is provided, the method will also check both the left side and right side sizes
+///
+/// For [JoinType::Full], it is alway unable to run CollectLeft mode and will return None.
+/// For [JoinType::Left] and [JoinType::LeftAnti], can not run CollectLeft mode, should swap join type to [JoinType::Right] and [JoinType::RightAnti]
+fn try_collect_left(
+    hash_join: &HashJoinExec,
+    collect_threshold: Option<usize>,
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    let left = hash_join.left();
+    let right = hash_join.right();
+    let join_type = hash_join.join_type();
+
+    let left_can_collect = match join_type {
+        JoinType::Left | JoinType::Full | JoinType::LeftAnti => false,
+        JoinType::Inner
+        | JoinType::LeftSemi
+        | JoinType::Right
+        | JoinType::RightSemi
+        | JoinType::RightAnti => collect_threshold.map_or(true, |threshold| {
+            supports_collect_by_size(&**left, threshold)
+        }),
+    };
+    let right_can_collect = match join_type {
+        JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
+        JoinType::Inner
+        | JoinType::RightSemi
+        | JoinType::Left
+        | JoinType::LeftSemi
+        | JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| {
+            supports_collect_by_size(&**right, threshold)
+        }),
+    };
+    match (left_can_collect, right_can_collect) {
+        (true, true) => {
+            if should_swap_join_order(&**left, &**right)
+                && supports_swap(*hash_join.join_type())
+            {
+                Ok(Some(swap_hash_join(
+                    hash_join,
+                    PartitionMode::CollectLeft,
+                    left,
+                    right,
+                )?))
+            } else {
+                Ok(Some(Arc::new(HashJoinExec::try_new(
+                    Arc::clone(left),
+                    Arc::clone(right),
+                    hash_join.on().to_vec(),
+                    hash_join.filter().clone(),
+                    hash_join.join_type(),
+                    PartitionMode::CollectLeft,
+                    hash_join.null_equals_null(),
+                )?)))
+            }
+        }
+        (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
+            Arc::clone(left),
+            Arc::clone(right),
+            hash_join.on().to_vec(),
+            hash_join.filter().clone(),
+            hash_join.join_type(),
+            PartitionMode::CollectLeft,
+            hash_join.null_equals_null(),
+        )?))),
+        (false, true) => {
+            if supports_swap(*hash_join.join_type()) {
+                Ok(Some(swap_hash_join(
+                    hash_join,
+                    PartitionMode::CollectLeft,
+                    left,
+                    right,
+                )?))
+            } else {
+                Ok(None)
+            }
+        }
+        (false, false) => Ok(None),
+    }
+}
+
+fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
+    let left = hash_join.left();
+    let right = hash_join.right();
+    if should_swap_join_order(&**left, &**right) && supports_swap(*hash_join.join_type())
+    {
+        swap_hash_join(hash_join, PartitionMode::Partitioned, left, right)
+    } else {
+        Ok(Arc::new(HashJoinExec::try_new(
+            Arc::clone(left),
+            Arc::clone(right),
+            hash_join.on().to_vec(),
+            hash_join.filter().clone(),
+            hash_join.join_type(),
+            PartitionMode::Partitioned,
+            hash_join.null_equals_null(),
+        )?))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{
+        physical_plan::{
+            displayable, joins::PartitionMode, ColumnStatistics, Statistics,
+        },
+        test::exec::StatisticsExec,
+    };
+
+    use super::*;
+    use std::sync::Arc;
+
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::ScalarValue;
+
+    fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
+        let big = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10),
+                total_byte_size: Some(100000),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
+        ));
+
+        let small = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100000),
+                total_byte_size: Some(10),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
+        ));
+        (big, small)
+    }
+
+    /// Create a column statistics vector for a single column
+    /// that has the given min/max/distinct_count properties.
+    ///
+    /// Given min/max will be mapped to a [`ScalarValue`] if
+    /// they are not `None`.
+    fn create_column_stats(
+        min: Option<u64>,
+        max: Option<u64>,
+        distinct_count: Option<usize>,
+    ) -> Option<Vec<ColumnStatistics>> {
+        Some(vec![ColumnStatistics {
+            distinct_count,
+            min_value: min.map(|size| ScalarValue::UInt64(Some(size))),
+            max_value: max.map(|size| ScalarValue::UInt64(Some(size))),
+            ..Default::default()
+        }])
+    }
+
+    /// Returns three plans with statistics of (min, max, distinct_count)
+    /// * big 100K rows @ (0, 50k, 50k)
+    /// * medium 10K rows @ (1k, 5k, 1k)
+    /// * small 1K rows @ (0, 100k, 1k)
+    fn create_nested_with_min_max() -> (
+        Arc<dyn ExecutionPlan>,
+        Arc<dyn ExecutionPlan>,
+        Arc<dyn ExecutionPlan>,
+    ) {
+        let big = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100_000),
+                column_statistics: create_column_stats(
+                    Some(0),
+                    Some(50_000),
+                    Some(50_000),
+                ),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
+        ));
+
+        let medium = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10_000),
+                column_statistics: create_column_stats(
+                    Some(1000),
+                    Some(5000),
+                    Some(1000),
+                ),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]),
+        ));
+
+        let small = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(1000),
+                column_statistics: create_column_stats(
+                    Some(0),
+                    Some(100_000),
+                    Some(1000),
+                ),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
+        ));
+
+        (big, medium, small)
+    }
+
+    #[tokio::test]
+    async fn test_join_with_swap() {
+        let (big, small) = create_big_and_small();
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        let optimized_join = JoinSelection::new()
+            .optimize(Arc::new(join), &SessionConfig::new())
+            .unwrap();
+
+        let swapping_projection = optimized_join
+            .as_any()
+            .downcast_ref::<ProjectionExec>()
+            .expect("A proj is required to swap columns back to their original order");
+
+        assert_eq!(swapping_projection.expr().len(), 2);
+        let (col, name) = &swapping_projection.expr()[0];
+        assert_eq!(name, "big_col");
+        assert_col_expr(col, "big_col", 1);
+        let (col, name) = &swapping_projection.expr()[1];
+        assert_eq!(name, "small_col");
+        assert_col_expr(col, "small_col", 0);
+
+        let swapped_join = swapping_projection
+            .input()
+            .as_any()
+            .downcast_ref::<HashJoinExec>()
+            .expect("The type of the plan should not be changed");
+
+        assert_eq!(swapped_join.left().statistics().total_byte_size, Some(10));
+        assert_eq!(
+            swapped_join.right().statistics().total_byte_size,
+            Some(100000)
+        );
+    }
+
+    #[tokio::test]
+    async fn test_left_join_with_swap() {
+        let (big, small) = create_big_and_small();
+        // Left out join should alway swap when the mode is PartitionMode::CollectLeft, even left side is small and right side is large
+        let join = HashJoinExec::try_new(
+            Arc::clone(&small),
+            Arc::clone(&big),
+            vec![(
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        let optimized_join = JoinSelection::new()
+            .optimize(Arc::new(join), &SessionConfig::new())
+            .unwrap();
+
+        let swapping_projection = optimized_join
+            .as_any()
+            .downcast_ref::<ProjectionExec>()
+            .expect("A proj is required to swap columns back to their original order");
+
+        assert_eq!(swapping_projection.expr().len(), 2);
+        println!("swapping_projection {:?}", swapping_projection);
+        let (col, name) = &swapping_projection.expr()[0];
+        assert_eq!(name, "small_col");
+        assert_col_expr(col, "small_col", 1);
+        let (col, name) = &swapping_projection.expr()[1];
+        assert_eq!(name, "big_col");
+        assert_col_expr(col, "big_col", 0);
+
+        let swapped_join = swapping_projection
+            .input()
+            .as_any()
+            .downcast_ref::<HashJoinExec>()
+            .expect("The type of the plan should not be changed");
+
+        assert_eq!(
+            swapped_join.left().statistics().total_byte_size,
+            Some(100000)
+        );
+        assert_eq!(swapped_join.right().statistics().total_byte_size, Some(10));
+    }
+
+    #[tokio::test]
+    async fn test_join_with_swap_semi() {
+        let join_types = [JoinType::LeftSemi, JoinType::LeftAnti];
+        for join_type in join_types {
+            let (big, small) = create_big_and_small();
+
+            let join = HashJoinExec::try_new(
+                Arc::clone(&big),
+                Arc::clone(&small),
+                vec![(
+                    Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                    Column::new_with_schema("small_col", &small.schema()).unwrap(),
+                )],
+                None,
+                &join_type,
+                PartitionMode::Partitioned,
+                &false,
+            )
+            .unwrap();
+
+            let original_schema = join.schema();
+
+            let optimized_join = JoinSelection::new()
+                .optimize(Arc::new(join), &SessionConfig::new())
+                .unwrap();
+
+            let swapped_join = optimized_join
+                .as_any()
+                .downcast_ref::<HashJoinExec>()
+                .expect(
+                    "A proj is not required to swap columns back to their original order",
+                );
+
+            assert_eq!(swapped_join.schema().fields().len(), 1);
+
+            assert_eq!(swapped_join.left().statistics().total_byte_size, Some(10));
+            assert_eq!(
+                swapped_join.right().statistics().total_byte_size,
+                Some(100000)
+            );
+
+            assert_eq!(original_schema, swapped_join.schema());
+        }
+    }
+
+    /// Compare the input plan with the plan after running the probe order optimizer.
+    macro_rules! assert_optimized {
+        ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            let expected_lines =
+                $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
+
+            let optimized = JoinSelection::new()
+                .optimize(Arc::new($PLAN), &SessionConfig::new())
+                .unwrap();
+
+            let plan = displayable(optimized.as_ref()).indent().to_string();
+            let actual_lines = plan.split("\n").collect::<Vec<&str>>();
+
+            assert_eq!(
+                &expected_lines, &actual_lines,
+                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+                expected_lines, actual_lines
+            );
+        };
+    }
+
+    #[tokio::test]
+    async fn test_nested_join_swap() {
+        let (big, medium, small) = create_nested_with_min_max();
+
+        // Form the inner join: big JOIN small
+        let child_join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Inner,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+        let child_schema = child_join.schema();
+
+        // Form join tree `medium LEFT JOIN (big JOIN small)`
+        let join = HashJoinExec::try_new(
+            Arc::clone(&medium),
+            Arc::new(child_join),
+            vec![(
+                Column::new_with_schema("medium_col", &medium.schema()).unwrap(),
+                Column::new_with_schema("small_col", &child_schema).unwrap(),
+            )],
+            None,
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        // Hash join uses the left side to build the hash table, and right side to probe it. We want
+        // to keep left as small as possible, so if we can estimate (with a reasonable margin of error)
+        // that the left side is smaller than the right side, we should swap the sides.
+        //
+        // The first hash join's left is 'small' table (with 1000 rows), and the second hash join's
+        // left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which
+        // has an exact cardinality of 10_000 rows).
+        let expected = [
+            "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]",
+            "  HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]",
+            "    ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]",
+            "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", index: 0 })]",
+            "        StatisticsExec: col_count=1, row_count=Some(1000)",
+            "        StatisticsExec: col_count=1, row_count=Some(100000)",
+            "    StatisticsExec: col_count=1, row_count=Some(10000)",
+            ""
+        ];
+        assert_optimized!(expected, join);
+    }
+
+    #[tokio::test]
+    async fn test_join_no_swap() {
+        let (big, small) = create_big_and_small();
+        let join = HashJoinExec::try_new(
+            Arc::clone(&small),
+            Arc::clone(&big),
+            vec![(
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+            )],
+            None,
+            &JoinType::Inner,
+            PartitionMode::CollectLeft,
+            &false,
+        )
+        .unwrap();
+
+        let optimized_join = JoinSelection::new()
+            .optimize(Arc::new(join), &SessionConfig::new())
+            .unwrap();
+
+        let swapped_join = optimized_join
+            .as_any()
+            .downcast_ref::<HashJoinExec>()
+            .expect("The type of the plan should not be changed");
+
+        assert_eq!(swapped_join.left().statistics().total_byte_size, Some(10));
+        assert_eq!(
+            swapped_join.right().statistics().total_byte_size,
+            Some(100000)
+        );
+    }
+
+    #[tokio::test]
+    async fn test_swap_reverting_projection() {
+        let left_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+
+        let right_schema = Schema::new(vec![Field::new("c", DataType::Int32, false)]);
+
+        let proj = swap_reverting_projection(&left_schema, &right_schema);
+
+        assert_eq!(proj.len(), 3);
+
+        let (col, name) = &proj[0];
+        assert_eq!(name, "a");
+        assert_col_expr(col, "a", 1);
+
+        let (col, name) = &proj[1];
+        assert_eq!(name, "b");
+        assert_col_expr(col, "b", 2);
+
+        let (col, name) = &proj[2];
+        assert_eq!(name, "c");
+        assert_col_expr(col, "c", 0);
+    }
+
+    fn assert_col_expr(expr: &Arc<dyn PhysicalExpr>, name: &str, index: usize) {
+        let col = expr
+            .as_any()
+            .downcast_ref::<Column>()
+            .expect("Projection items should be Column expression");
+        assert_eq!(col.name(), name);
+        assert_eq!(col.index(), index);
+    }
+
+    #[tokio::test]
+    async fn test_join_selection_collect_left() {
+        let big = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10000000),
+                total_byte_size: Some(10000000),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
+        ));
+
+        let small = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10),
+                total_byte_size: Some(10),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
+        ));
+
+        let empty = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: None,
+                total_byte_size: None,
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]),
+        ));
+
+        let join_on = vec![(
+            Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            Column::new_with_schema("big_col", &big.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            small.clone(),
+            big.clone(),
+            join_on,
+            false,
+            PartitionMode::CollectLeft,
+        );
+
+        let join_on = vec![(
+            Column::new_with_schema("big_col", &big.schema()).unwrap(),
+            Column::new_with_schema("small_col", &small.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            big,
+            small.clone(),
+            join_on,
+            true,
+            PartitionMode::CollectLeft,
+        );
+
+        let join_on = vec![(
+            Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            small.clone(),
+            empty.clone(),
+            join_on,
+            false,
+            PartitionMode::CollectLeft,
+        );
+
+        let join_on = vec![(
+            Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
+            Column::new_with_schema("small_col", &small.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            empty,
+            small,
+            join_on,
+            true,
+            PartitionMode::CollectLeft,
+        );
+    }
+
+    #[tokio::test]
+    async fn test_join_selection_partitioned() {
+        let big1 = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10000000),
+                total_byte_size: Some(10000000),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col1", DataType::Int32, false)]),
+        ));
+
+        let big2 = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(20000000),
+                total_byte_size: Some(20000000),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col2", DataType::Int32, false)]),
+        ));
+
+        let empty = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: None,
+                total_byte_size: None,
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]),
+        ));
+
+        let join_on = vec![(
+            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+            Column::new_with_schema("big_col2", &big2.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            big1.clone(),
+            big2.clone(),
+            join_on,
+            false,
+            PartitionMode::Partitioned,
+        );
+
+        let join_on = vec![(
+            Column::new_with_schema("big_col2", &big2.schema()).unwrap(),
+            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            big2,
+            big1.clone(),
+            join_on,
+            true,
+            PartitionMode::Partitioned,
+        );
+
+        let join_on = vec![(
+            Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
+            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            empty.clone(),
+            big1.clone(),
+            join_on,
+            false,
+            PartitionMode::Partitioned,
+        );
+
+        let join_on = vec![(
+            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+            Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
+        )];
+        check_join_partition_mode(
+            big1,
+            empty,
+            join_on,
+            false,
+            PartitionMode::Partitioned,
+        );
+    }
+
+    fn check_join_partition_mode(
+        left: Arc<StatisticsExec>,
+        right: Arc<StatisticsExec>,
+        on: Vec<(Column, Column)>,
+        is_swapped: bool,
+        expected_mode: PartitionMode,
+    ) {
+        let join = HashJoinExec::try_new(
+            left,
+            right,
+            on,
+            None,
+            &JoinType::Inner,
+            PartitionMode::Auto,
+            &false,
+        )
+        .unwrap();
+
+        let optimized_join = JoinSelection::new()
+            .optimize(Arc::new(join), &SessionConfig::new())
+            .unwrap();
+
+        if !is_swapped {
+            let swapped_join = optimized_join
+                .as_any()
+                .downcast_ref::<HashJoinExec>()
+                .expect("The type of the plan should not be changed");
+            assert_eq!(*swapped_join.partition_mode(), expected_mode);
+        } else {
+            let swapping_projection = optimized_join
+                .as_any()
+                .downcast_ref::<ProjectionExec>()
+                .expect(
+                    "A proj is required to swap columns back to their original order",
+                );
+            let swapped_join = swapping_projection
+                .input()
+                .as_any()
+                .downcast_ref::<HashJoinExec>()
+                .expect("The type of the plan should not be changed");
+
+            assert_eq!(*swapped_join.partition_mode(), expected_mode);
+        }
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 5050dae35..36b00a0e0 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -21,7 +21,7 @@
 pub mod aggregate_statistics;
 pub mod coalesce_batches;
 pub mod enforcement;
-pub mod hash_build_probe_order;
+pub mod join_selection;
 pub mod optimizer;
 pub mod pruning;
 pub mod repartition;
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index ead84e66c..24b0f90d7 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -296,6 +296,10 @@ impl ExecutionPlan for HashJoinExec {
                     Distribution::HashPartitioned(right_expr),
                 ]
             }
+            PartitionMode::Auto => vec![
+                Distribution::UnspecifiedDistribution,
+                Distribution::UnspecifiedDistribution,
+            ],
         }
     }
 
@@ -323,6 +327,9 @@ impl ExecutionPlan for HashJoinExec {
                 self.right.output_partitioning(),
                 left_columns_len,
             ),
+            PartitionMode::Auto => Partitioning::UnknownPartitioning(
+                self.right.output_partitioning().partition_count(),
+            ),
         }
     }
 
@@ -387,6 +394,12 @@ impl ExecutionPlan for HashJoinExec {
                 on_left.clone(),
                 context.clone(),
             )),
+            PartitionMode::Auto => {
+                return Err(DataFusionError::Plan(format!(
+                    "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
+                    PartitionMode::Auto
+                )))
+            }
         };
 
         // we have the batches and the hash map with their keys. We can how create a stream
diff --git a/datafusion/core/src/physical_plan/joins/mod.rs b/datafusion/core/src/physical_plan/joins/mod.rs
index ae8f943af..8066c7d9c 100644
--- a/datafusion/core/src/physical_plan/joins/mod.rs
+++ b/datafusion/core/src/physical_plan/joins/mod.rs
@@ -29,6 +29,9 @@ pub enum PartitionMode {
     Partitioned,
     /// Left side will collected into one partition
     CollectLeft,
+    /// When set to Auto, DataFusion optimizer will decide which PartitionMode mode(Partitioned/CollectLeft) is optimal based on statistics.
+    /// It will also consider swapping the left and right inputs for the Join
+    Auto,
 }
 
 pub use cross_join::CrossJoinExec;
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 93d2b862e..7ae4192e4 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 //! Defines the Sort-Merge join execution plan.
-//! A sort-merge join plan consumes two sorted children plan and produces
+//! A Sort-Merge join plan consumes two sorted children plan and produces
 //! joined output by given join type and other options.
+//! Sort-Merge join feature is currently experimental.
 
 use std::any::Any;
 use std::cmp::Ordering;
@@ -44,7 +45,7 @@ use crate::physical_plan::expressions::Column;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::joins::utils::{
     build_join_schema, check_join_is_valid, combine_join_equivalence_properties,
-    partitioned_join_output_partitioning, JoinOn,
+    estimate_join_statistics, partitioned_join_output_partitioning, JoinOn,
 };
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use crate::physical_plan::{
@@ -349,7 +350,15 @@ impl ExecutionPlan for SortMergeJoinExec {
     }
 
     fn statistics(&self) -> Statistics {
-        todo!()
+        // TODO stats: it is not possible in general to know the output size of joins
+        // There are some special cases though, for example:
+        // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
+        estimate_join_statistics(
+            self.left.clone(),
+            self.right.clone(),
+            self.on.clone(),
+            &self.join_type,
+        )
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 691c935b8..19d3c3e08 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -23,7 +23,9 @@ use super::{
     aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
     values::ValuesExec, windows,
 };
-use crate::config::{OPT_EXPLAIN_LOGICAL_PLAN_ONLY, OPT_EXPLAIN_PHYSICAL_PLAN_ONLY};
+use crate::config::{
+    OPT_EXPLAIN_LOGICAL_PLAN_ONLY, OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, OPT_PREFER_HASH_JOIN,
+};
 use crate::datasource::source_as_provider;
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_expr::utils::generate_sort_key;
@@ -44,6 +46,7 @@ use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
 use crate::physical_plan::filter::FilterExec;
 use crate::physical_plan::joins::CrossJoinExec;
 use crate::physical_plan::joins::HashJoinExec;
+use crate::physical_plan::joins::SortMergeJoinExec;
 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
@@ -930,17 +933,47 @@ impl DefaultPhysicalPlanner {
                         _ => None
                     };
 
+                    let prefer_hash_join = session_state.config.config_options()
+                        .read()
+                        .get_bool(OPT_PREFER_HASH_JOIN)
+                        .unwrap_or_default();
                     if session_state.config.target_partitions > 1
                         && session_state.config.repartition_joins
+                        && !prefer_hash_join
                     {
-                        // Use hash partition by default to parallelize hash joins
+                        // Use SortMergeJoin if hash join is not preferred
+                        // Sort-Merge join support currently is experimental
+                        if join_filter.is_some() {
+                            // TODO SortMergeJoinExec need to support join filter
+                            Err(DataFusionError::NotImplemented("SortMergeJoinExec does not support join_filter now.".to_string()))
+                        } else {
+                            let join_on_len = join_on.len();
+                            Ok(Arc::new(SortMergeJoinExec::try_new(
+                                physical_left,
+                                physical_right,
+                                join_on,
+                                *join_type,
+                                vec![SortOptions::default(); join_on_len],
+                                *null_equals_null,
+                            )?))
+                        }
+                    } else if session_state.config.target_partitions > 1
+                        && session_state.config.repartition_joins
+                        && prefer_hash_join {
+                         let partition_mode = {
+                            if session_state.config.collect_statistics {
+                                PartitionMode::Auto
+                            } else {
+                                PartitionMode::Partitioned
+                            }
+                         };
                         Ok(Arc::new(HashJoinExec::try_new(
                             physical_left,
                             physical_right,
                             join_on,
                             join_filter,
                             join_type,
-                            PartitionMode::Partitioned,
+                            partition_mode,
                             null_equals_null,
                         )?))
                     } else {
diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs
index f224e1365..fbd740fae 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -698,24 +698,27 @@ async fn show_all() {
 
     // Has all the default values, should be in order by name
     let expected = vec![
-        "+-------------------------------------------------+---------+",
-        "| name                                            | setting |",
-        "+-------------------------------------------------+---------+",
-        "| datafusion.catalog.location                     | NULL    |",
-        "| datafusion.catalog.type                         | NULL    |",
-        "| datafusion.execution.batch_size                 | 8192    |",
-        "| datafusion.execution.coalesce_batches           | true    |",
-        "| datafusion.execution.coalesce_target_batch_size | 4096    |",
-        "| datafusion.execution.parquet.enable_page_index  | false   |",
-        "| datafusion.execution.parquet.pushdown_filters   | false   |",
-        "| datafusion.execution.parquet.reorder_filters    | false   |",
-        "| datafusion.execution.time_zone                  | +00:00  |",
-        "| datafusion.explain.logical_plan_only            | false   |",
-        "| datafusion.explain.physical_plan_only           | false   |",
-        "| datafusion.optimizer.filter_null_join_keys      | false   |",
-        "| datafusion.optimizer.max_passes                 | 3       |",
-        "| datafusion.optimizer.skip_failed_rules          | true    |",
-        "+-------------------------------------------------+---------+",
+        "+-----------------------------------------------------------+---------+",
+        "| name                                                      | setting |",
+        "+-----------------------------------------------------------+---------+",
+        "| datafusion.catalog.location                               | NULL    |",
+        "| datafusion.catalog.type                                   | NULL    |",
+        "| datafusion.execution.batch_size                           | 8192    |",
+        "| datafusion.execution.coalesce_batches                     | true    |",
+        "| datafusion.execution.coalesce_target_batch_size           | 4096    |",
+        "| datafusion.execution.parquet.enable_page_index            | false   |",
+        "| datafusion.execution.parquet.pushdown_filters             | false   |",
+        "| datafusion.execution.parquet.reorder_filters              | false   |",
+        "| datafusion.execution.time_zone                            | +00:00  |",
+        "| datafusion.explain.logical_plan_only                      | false   |",
+        "| datafusion.explain.physical_plan_only                     | false   |",
+        "| datafusion.optimizer.filter_null_join_keys                | false   |",
+        "| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 |",
+        "| datafusion.optimizer.max_passes                           | 3       |",
+        "| datafusion.optimizer.prefer_hash_join                     | true    |",
+        "| datafusion.optimizer.skip_failed_rules                    | true    |",
+        "| datafusion.optimizer.top_down_join_key_reordering         | true    |",
+        "+-----------------------------------------------------------+---------+",
     ];
 
     assert_batches_eq!(expected, &results);
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 324ccb4c7..7dca0e91c 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -20,247 +20,305 @@ use datafusion::from_slice::FromSlice;
 
 #[tokio::test]
 async fn equijoin() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
-    ];
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
-    }
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+        ];
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
 
-    let ctx = create_join_context_qualified("t1", "t2")?;
-    let equivalent_sql = [
-        "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t1.a = t2.a ORDER BY t1.a",
-        "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t2.a = t1.a ORDER BY t1.a",
-    ];
-    let expected = vec![
-        "+---+-----+",
-        "| a | b   |",
-        "+---+-----+",
-        "| 1 | 100 |",
-        "| 2 | 200 |",
-        "| 4 | 400 |",
-        "+---+-----+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
+        let ctx = create_join_context_qualified("t1", "t2")?;
+        let equivalent_sql = [
+            "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t1.a = t2.a ORDER BY t1.a",
+            "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t2.a = t1.a ORDER BY t1.a",
+        ];
+        let expected = vec![
+            "+---+-----+",
+            "| a | b   |",
+            "+---+-----+",
+            "| 1 | 100 |",
+            "| 2 | 200 |",
+            "| 4 | 400 |",
+            "+---+-----+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
     }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_multiple_condition_ordering() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t1_name <> t2_name ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name <> t1_name ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t1_name <> t2_name ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t2_name <> t1_name ORDER BY t1_id",
-    ];
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t1_name <> t2_name ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name <> t1_name ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t1_name <> t2_name ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t2_name <> t1_name ORDER BY t1_id",
+        ];
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
     }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_and_other_condition() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "+-------+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_left_and_condition_from_right() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
-    let res = ctx.create_logical_plan(sql);
-    assert!(res.is_ok());
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 33    | c       |         |",
-        "| 44    | d       |         |",
-        "+-------+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+        let res = ctx.create_logical_plan(sql);
+        assert!(res.is_ok());
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 33    | c       |         |",
+            "| 44    | d       |         |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn full_join_sub_query() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql = "
+        SELECT t1_id, t1_name, t2_name FROM (SELECT * from (t1) AS t1) FULL JOIN (SELECT * from (t2) AS t2) ON t1_id = t2_id AND t2_name >= 'y'
+         ORDER BY t1_id, t2_name";
+        let res = ctx.create_logical_plan(sql);
+        assert!(res.is_ok());
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 33    | c       |         |",
+            "| 44    | d       |         |",
+            "|       |         | w       |",
+            "|       |         | x       |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_right_and_condition_from_left() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22 ORDER BY t2_name";
-    let res = ctx.create_logical_plan(sql);
-    assert!(res.is_ok());
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "|       |         | w       |",
-        "| 44    | d       | x       |",
-        "| 22    | b       | y       |",
-        "|       |         | z       |",
-        "+-------+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22 ORDER BY t2_name";
+        let res = ctx.create_logical_plan(sql);
+        assert!(res.is_ok());
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "|       |         | w       |",
+            "| 44    | d       | x       |",
+            "| 22    | b       | y       |",
+            "|       |         | z       |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_left_and_condition_from_left() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       |         |",
-        "| 22    | b       |         |",
-        "| 33    | c       |         |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    let actual = execute_to_batches(&ctx, sql).await;
-    assert_batches_eq!(expected, &actual);
-
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       |         |",
+            "| 22    | b       |         |",
+            "| 33    | c       |         |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        let actual = execute_to_batches(&ctx, sql).await;
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_left_and_condition_from_both() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_int, t2_int FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_int >= t2_int ORDER BY t1_id";
-    let expected = vec![
-        "+-------+--------+--------+",
-        "| t1_id | t1_int | t2_int |",
-        "+-------+--------+--------+",
-        "| 11    | 1      |        |",
-        "| 22    | 2      | 1      |",
-        "| 33    | 3      |        |",
-        "| 44    | 4      | 3      |",
-        "+-------+--------+--------+",
-    ];
-    let actual = execute_to_batches(&ctx, sql).await;
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_int, t2_int FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_int >= t2_int ORDER BY t1_id";
+        let expected = vec![
+            "+-------+--------+--------+",
+            "| t1_id | t1_int | t2_int |",
+            "+-------+--------+--------+",
+            "| 11    | 1      |        |",
+            "| 22    | 2      | 1      |",
+            "| 33    | 3      |        |",
+            "| 44    | 4      | 3      |",
+            "+-------+--------+--------+",
+        ];
+        let actual = execute_to_batches(&ctx, sql).await;
+        assert_batches_eq!(expected, &actual);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_right_and_condition_from_right() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_id >= 22 ORDER BY t2_name";
-    let res = ctx.create_logical_plan(sql);
-    assert!(res.is_ok());
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "|       |         | w       |",
-        "| 44    | d       | x       |",
-        "| 22    | b       | y       |",
-        "|       |         | z       |",
-        "+-------+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_id >= 22 ORDER BY t2_name";
+        let res = ctx.create_logical_plan(sql);
+        assert!(res.is_ok());
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "|       |         | w       |",
+            "| 44    | d       | x       |",
+            "| 22    | b       | y       |",
+            "|       |         | z       |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_right_and_condition_from_both() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_int, t2_int, t2_id FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int ORDER BY t2_id";
-    let res = ctx.create_logical_plan(sql);
-    assert!(res.is_ok());
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------+--------+-------+",
-        "| t1_int | t2_int | t2_id |",
-        "+--------+--------+-------+",
-        "|        | 3      | 11    |",
-        "| 2      | 1      | 22    |",
-        "| 4      | 3      | 44    |",
-        "|        | 3      | 55    |",
-        "+--------+--------+-------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_int, t2_int, t2_id FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int ORDER BY t2_id";
+        let res = ctx.create_logical_plan(sql);
+        assert!(res.is_ok());
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+--------+--------+-------+",
+            "| t1_int | t2_int | t2_id |",
+            "+--------+--------+-------+",
+            "|        | 3      | 11    |",
+            "| 2      | 1      | 22    |",
+            "| 4      | 3      | 44    |",
+            "|        | 3      | 55    |",
+            "+--------+--------+-------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn left_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
-    ];
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 33    | c       |         |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+        ];
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 33    | c       |         |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
     }
     Ok(())
 }
 
 #[tokio::test]
 async fn left_join_unbalanced() -> Result<()> {
-    // the t1_id is larger than t2_id so the hash_build_probe_order optimizer should kick in
+    // the t1_id is larger than t2_id so the join_selection optimizer should kick in
     let ctx = create_join_context_unbalanced("t1_id", "t2_id")?;
     let equivalent_sql = [
         "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
@@ -500,58 +558,64 @@ async fn full_join_not_null_filter() -> Result<()> {
 
 #[tokio::test]
 async fn right_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t2_id = t1_id ORDER BY t1_id"
-    ];
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 44    | d       | x       |",
-        "|       |         | w       |",
-        "+-------+---------+---------+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t2_id = t1_id ORDER BY t1_id"
+        ];
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 44    | d       | x       |",
+            "|       |         | w       |",
+            "+-------+---------+---------+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
     }
     Ok(())
 }
 
 #[tokio::test]
 async fn full_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
-    ];
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 33    | c       |         |",
-        "| 44    | d       | x       |",
-        "|       |         | w       |",
-        "+-------+---------+---------+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
-    }
-
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+        ];
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 33    | c       |         |",
+            "| 44    | d       | x       |",
+            "|       |         | w       |",
+            "+-------+---------+---------+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
+
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
     }
 
     Ok(())
@@ -559,149 +623,165 @@ async fn full_join() -> Result<()> {
 
 #[tokio::test]
 async fn left_join_using() -> Result<()> {
-    let ctx = create_join_context("id", "id")?;
-    let sql = "SELECT id, t1_name, t2_name FROM t1 LEFT JOIN t2 USING (id) ORDER BY id";
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+----+---------+---------+",
-        "| id | t1_name | t2_name |",
-        "+----+---------+---------+",
-        "| 11 | a       | z       |",
-        "| 22 | b       | y       |",
-        "| 33 | c       |         |",
-        "| 44 | d       | x       |",
-        "+----+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("id", "id", repartition_joins)?;
+        let sql =
+            "SELECT id, t1_name, t2_name FROM t1 LEFT JOIN t2 USING (id) ORDER BY id";
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+----+---------+---------+",
+            "| id | t1_name | t2_name |",
+            "+----+---------+---------+",
+            "| 11 | a       | z       |",
+            "| 22 | b       | y       |",
+            "| 33 | c       |         |",
+            "| 44 | d       | x       |",
+            "+----+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_implicit_syntax() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let equivalent_sql = [
-        "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id",
-        "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id",
-    ];
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    for sql in equivalent_sql.iter() {
-        let actual = execute_to_batches(&ctx, sql).await;
-        assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let equivalent_sql = [
+            "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id",
+            "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id",
+        ];
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        for sql in equivalent_sql.iter() {
+            let actual = execute_to_batches(&ctx, sql).await;
+            assert_batches_eq!(expected, &actual);
+        }
     }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_implicit_syntax_with_filter() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql = "SELECT t1_id, t1_name, t2_name \
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql = "SELECT t1_id, t1_name, t2_name \
         FROM t1, t2 \
         WHERE t1_id > 0 \
         AND t1_id = t2_id \
         AND t2_id < 99 \
         ORDER BY t1_id";
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn equijoin_implicit_syntax_reversed() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-    let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id";
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 22    | b       | y       |",
-        "| 44    | d       | x       |",
-        "+-------+---------+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+        let sql =
+            "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id";
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 22    | b       | y       |",
+            "| 44    | d       | x       |",
+            "+-------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
 #[tokio::test]
 async fn cross_join() {
-    let ctx = create_join_context("t1_id", "t2_id").unwrap();
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins).unwrap();
 
-    let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id";
-    let actual = execute(&ctx, sql).await;
+        let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id";
+        let actual = execute(&ctx, sql).await;
 
-    assert_eq!(4 * 4, actual.len());
+        assert_eq!(4 * 4, actual.len());
 
-    let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id";
-    let actual = execute(&ctx, sql).await;
+        let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id";
+        let actual = execute(&ctx, sql).await;
 
-    assert_eq!(4 * 4, actual.len());
+        assert_eq!(4 * 4, actual.len());
 
-    let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
+        let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
 
-    let actual = execute(&ctx, sql).await;
-    assert_eq!(4 * 4, actual.len());
+        let actual = execute(&ctx, sql).await;
+        assert_eq!(4 * 4, actual.len());
 
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+---------+",
-        "| t1_id | t1_name | t2_name |",
-        "+-------+---------+---------+",
-        "| 11    | a       | z       |",
-        "| 11    | a       | y       |",
-        "| 11    | a       | x       |",
-        "| 11    | a       | w       |",
-        "| 22    | b       | z       |",
-        "| 22    | b       | y       |",
-        "| 22    | b       | x       |",
-        "| 22    | b       | w       |",
-        "| 33    | c       | z       |",
-        "| 33    | c       | y       |",
-        "| 33    | c       | x       |",
-        "| 33    | c       | w       |",
-        "| 44    | d       | z       |",
-        "| 44    | d       | y       |",
-        "| 44    | d       | x       |",
-        "| 44    | d       | w       |",
-        "+-------+---------+---------+",
-    ];
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+---------+",
+            "| t1_id | t1_name | t2_name |",
+            "+-------+---------+---------+",
+            "| 11    | a       | z       |",
+            "| 11    | a       | y       |",
+            "| 11    | a       | x       |",
+            "| 11    | a       | w       |",
+            "| 22    | b       | z       |",
+            "| 22    | b       | y       |",
+            "| 22    | b       | x       |",
+            "| 22    | b       | w       |",
+            "| 33    | c       | z       |",
+            "| 33    | c       | y       |",
+            "| 33    | c       | x       |",
+            "| 33    | c       | w       |",
+            "| 44    | d       | z       |",
+            "| 44    | d       | y       |",
+            "| 44    | d       | x       |",
+            "| 44    | d       | w       |",
+            "+-------+---------+---------+",
+        ];
 
-    assert_batches_eq!(expected, &actual);
+        assert_batches_eq!(expected, &actual);
 
-    // Two partitions (from UNION) on the left
-    let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT t1_id, t1_name FROM t1) AS t1 CROSS JOIN t2";
-    let actual = execute(&ctx, sql).await;
+        // Two partitions (from UNION) on the left
+        let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT t1_id, t1_name FROM t1) AS t1 CROSS JOIN t2";
+        let actual = execute(&ctx, sql).await;
 
-    assert_eq!(4 * 4 * 2, actual.len());
+        assert_eq!(4 * 4 * 2, actual.len());
 
-    // Two partitions (from UNION) on the right
-    let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT t2_name FROM t2 UNION ALL SELECT t2_name FROM t2) AS t2";
-    let actual = execute(&ctx, sql).await;
+        // Two partitions (from UNION) on the right
+        let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT t2_name FROM t2 UNION ALL SELECT t2_name FROM t2) AS t2";
+        let actual = execute(&ctx, sql).await;
 
-    assert_eq!(4 * 4 * 2, actual.len());
+        assert_eq!(4 * 4 * 2, actual.len());
+    }
 }
 
 #[tokio::test]
 async fn cross_join_unbalanced() {
-    // the t1_id is larger than t2_id so the hash_build_probe_order optimizer should kick in
+    // the t1_id is larger than t2_id so the join_selection optimizer should kick in
     let ctx = create_join_context_unbalanced("t1_id", "t2_id").unwrap();
 
     // the order of the values is not determinisitic, so we need to sort to check the values
@@ -1414,350 +1494,497 @@ async fn hash_join_with_dictionary() -> Result<()> {
 
 #[tokio::test]
 async fn reduce_left_join_1() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-
-    // reduce to inner join
-    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id < 100";
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
-    let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "      Filter: t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-        expected, actual
-    );
-    let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 11    | a       | 1      | 11    | z       | 3      |",
-        "| 22    | b       | 2      | 22    | y       | 1      |",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql =
+            "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id < 100";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Filter: t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 11    | z       | 3      |",
+            "| 22    | b       | 2      | 22    | y       | 1      |",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
 async fn reduce_left_join_2() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-
-    // reduce to inner join
-    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')";
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
-    let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-
-    // filter expr:  `t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')`
-    // could be write to: `(t1.t1_int > 2 or t2.t2_int < 10) and (t2.t2_name != 'w' or t2.t2_int < 10)`
-    // the right part `(t2.t2_name != 'w' or t2.t2_int < 10)` could be push down left join side and remove in filter.
-
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Filter: t2.t2_int < UInt32(10) OR t1.t1_int > UInt32(2) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        Filter: t2.t2_int < UInt32(10) OR t2.t2_name != Utf8(\"w\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-        expected, actual
-    );
-    let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 11    | a       | 1      | 11    | z       | 3      |",
-        "| 22    | b       | 2      | 22    | y       | 1      |",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+
+        // filter expr:  `t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')`
+        // could be write to: `(t1.t1_int > 2 or t2.t2_int < 10) and (t2.t2_name != 'w' or t2.t2_int < 10)`
+        // the right part `(t2.t2_name != 'w' or t2.t2_int < 10)` could be push down left join side and remove in filter.
+
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Filter: t2.t2_int < UInt32(10) OR t1.t1_int > UInt32(2) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        Filter: t2.t2_int < UInt32(10) OR t2.t2_name != Utf8(\"w\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "          TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 11    | z       | 3      |",
+            "| 22    | b       | 2      | 22    | y       | 1      |",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
 async fn reduce_left_join_3() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-
-    // reduce subquery to inner join
-    let sql = "select * from (select t1.* from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 3) t3 left join t2 on t3.t1_int = t2.t2_int where t3.t1_id < 100";
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
-    let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "          Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "            Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "              TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "            Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "              TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-        expected, actual
-    );
-    let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 22    | b       | 2      |       |         |        |",
-        "+-------+---------+--------+-------+---------+--------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce subquery to inner join
+        let sql = "select * from (select t1.* from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 3) t3 left join t2 on t3.t1_int = t2.t2_int where t3.t1_id < 100";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "          Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "            Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "              TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "            Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "              TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 22    | b       | 2      |       |         |        |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
 async fn reduce_right_join_1() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
-
-    // reduce to inner join
-    let sql = "select * from t1 right join t2 on t1.t1_id = t2.t2_id where t1.t1_int is not null";
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
-    let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Filter: t1.t1_int IS NOT NULL [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-        expected, actual
-    );
-    let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 11    | a       | 1      | 11    | z       | 3      |",
-        "| 22    | b       | 2      | 22    | y       | 1      |",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 right join t2 on t1.t1_id = t2.t2_id where t1.t1_int is not null";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Filter: t1.t1_int IS NOT NULL [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 11    | z       | 3      |",
+            "| 22    | b       | 2      | 22    | y       | 1      |",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
 async fn reduce_right_join_2() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 right join t2 on t1.t1_id = t2.t2_id where not(t1.t1_int = t2.t2_int)";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Filter: t1.t1_int != t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 11    | z       | 3      |",
+            "| 22    | b       | 2      | 22    | y       | 1      |",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
 
-    // reduce to inner join
-    let sql = "select * from t1 right join t2 on t1.t1_id = t2.t2_id where not(t1.t1_int = t2.t2_int)";
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
-    let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Filter: t1.t1_int != t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-        expected, actual
-    );
-    let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 11    | a       | 1      | 11    | z       | 3      |",
-        "| 22    | b       | 2      | 22    | y       | 1      |",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
-    ];
+    Ok(())
+}
 
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
+#[tokio::test]
+async fn reduce_full_join_to_right_join() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to right join
+        let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t2.t2_name is not null";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Right Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Filter: t2.t2_name IS NOT NULL [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "|       |         |        | 55    | w       | 3      |",
+            "| 11    | a       | 1      | 11    | z       | 3      |",
+            "| 22    | b       | 2      | 22    | y       | 1      |",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
-async fn reduce_full_join_to_right_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+async fn reduce_full_join_to_left_join() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to left join
+        let sql =
+            "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t1.t1_name != 'b'";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Left Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Filter: t1.t1_name != Utf8(\"b\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 11    | z       | 3      |",
+            "| 33    | c       | 3      |       |         |        |",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+    Ok(())
+}
 
-    // reduce to right join
-    let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t2.t2_name is not null";
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
-    let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Right Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "      Filter: t2.t2_name IS NOT NULL [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+#[tokio::test]
+async fn reduce_full_join_to_inner_join() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t1.t1_name != 'b' and t2.t2_name = 'x'";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Filter: t1.t1_name != Utf8(\"b\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Filter: t2.t2_name = Utf8(\"x\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 44    | d       | 4      | 44    | x       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn sort_merge_equijoin() -> Result<()> {
+    let ctx = create_sort_merge_join_context("t1_id", "t2_id")?;
+    let equivalent_sql = [
+        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
     ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-        expected, actual
-    );
     let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "|       |         |        | 55    | w       | 3      |",
-        "| 11    | a       | 1      | 11    | z       | 3      |",
-        "| 22    | b       | 2      | 22    | y       | 1      |",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
+        "+-------+---------+---------+",
+        "| t1_id | t1_name | t2_name |",
+        "+-------+---------+---------+",
+        "| 11    | a       | z       |",
+        "| 22    | b       | y       |",
+        "| 44    | d       | x       |",
+        "+-------+---------+---------+",
     ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
+    for sql in equivalent_sql.iter() {
+        let actual = execute_to_batches(&ctx, sql).await;
+        assert_batches_eq!(expected, &actual);
+    }
 
     Ok(())
 }
 
 #[tokio::test]
-async fn reduce_full_join_to_left_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+async fn sort_merge_join_on_date32() -> Result<()> {
+    let ctx = create_sort_merge_join_datatype_context()?;
 
-    // reduce to left join
-    let sql =
-        "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t1.t1_name != 'b'";
+    // inner sort merge join on data type (Date32)
+    let sql = "select * from t1 join t2 on t1.c1 = t2.c1";
     let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
+    let plan = ctx.create_logical_plan(sql).expect(&msg);
     let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Left Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Filter: t1.t1_name != Utf8(\"b\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
+    let logical_plan = state.optimize(&plan)?;
+    let physical_plan = state.create_physical_plan(&logical_plan).await?;
+    let expected = vec![
+        "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]",
+        "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]",
+        "    SortExec: [c1@0 ASC]",
+        "      CoalesceBatchesExec: target_batch_size=4096",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
+        "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "            MemoryExec: partitions=1, partition_sizes=[1]",
+        "    SortExec: [c1@0 ASC]",
+        "      CoalesceBatchesExec: target_batch_size=4096",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
+        "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "            MemoryExec: partitions=1, partition_sizes=[1]",
+    ];
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
     assert_eq!(
         expected, actual,
         "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
         expected, actual
     );
+
     let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 11    | a       | 1      | 11    | z       | 3      |",
-        "| 33    | c       | 3      |       |         |        |",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
+        "+------------+------------+---------+-----+------------+------------+---------+-----+",
+        "| c1         | c2         | c3      | c4  | c1         | c2         | c3      | c4  |",
+        "+------------+------------+---------+-----+------------+------------+---------+-----+",
+        "| 1970-01-02 | 1970-01-02 | 1.23    | abc | 1970-01-02 | 1970-01-02 | -123.12 | abc |",
+        "| 1970-01-04 |            | -123.12 | jkl | 1970-01-04 |            | 789.00  |     |",
+        "+------------+------------+---------+-----+------------+------------+---------+-----+",
     ];
 
     let results = execute_to_batches(&ctx, sql).await;
     assert_batches_sorted_eq!(expected, &results);
+
     Ok(())
 }
 
 #[tokio::test]
-async fn reduce_full_join_to_inner_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+async fn sort_merge_join_on_decimal() -> Result<()> {
+    let ctx = create_sort_merge_join_datatype_context()?;
 
-    // reduce to inner join
-    let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t1.t1_name != 'b' and t2.t2_name = 'x'";
+    // right join on data type (Decimal)
+    let sql = "select * from t1 right join t2 on t1.c3 = t2.c3";
     let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx
-        .create_logical_plan(&("explain ".to_owned() + sql))
-        .expect(&msg);
+    let plan = ctx.create_logical_plan(sql).expect(&msg);
     let state = ctx.state();
-    let plan = state.optimize(&plan)?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Filter: t1.t1_name != Utf8(\"b\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "      Filter: t2.t2_name = Utf8(\"x\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
+    let logical_plan = state.optimize(&plan)?;
+    let physical_plan = state.create_physical_plan(&logical_plan).await?;
+    let expected = vec![
+        "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]",
+        "  SortMergeJoin: join_type=Right, on=[(Column { name: \"c3\", index: 2 }, Column { name: \"c3\", index: 2 })]",
+        "    SortExec: [c3@2 ASC]",
+        "      CoalesceBatchesExec: target_batch_size=4096",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2)",
+        "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "            MemoryExec: partitions=1, partition_sizes=[1]",
+        "    SortExec: [c3@2 ASC]",
+        "      CoalesceBatchesExec: target_batch_size=4096",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2)",
+        "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "            MemoryExec: partitions=1, partition_sizes=[1]",
+    ];
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
     assert_eq!(
         expected, actual,
         "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
         expected, actual
     );
+
     let expected = vec![
-        "+-------+---------+--------+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
-        "+-------+---------+--------+-------+---------+--------+",
-        "| 44    | d       | 4      | 44    | x       | 3      |",
-        "+-------+---------+--------+-------+---------+--------+",
+        "+------------+------------+---------+-----+------------+------------+-----------+---------+",
+        "| c1         | c2         | c3      | c4  | c1         | c2         | c3        | c4      |",
+        "+------------+------------+---------+-----+------------+------------+-----------+---------+",
+        "|            |            |         |     |            |            | 100000.00 | abcdefg |",
+        "|            |            |         |     |            | 1970-01-04 | 0.00      | qwerty  |",
+        "|            | 1970-01-04 | 789.00  | ghi | 1970-01-04 |            | 789.00    |         |",
+        "| 1970-01-04 |            | -123.12 | jkl | 1970-01-02 | 1970-01-02 | -123.12   | abc     |",
+        "+------------+------------+---------+-----+------------+------------+-----------+---------+",
     ];
 
     let results = execute_to_batches(&ctx, sql).await;
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 619270f20..81a48bd99 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -25,6 +25,7 @@ use arrow::{
 use chrono::prelude::*;
 use chrono::Duration;
 
+use datafusion::config::OPT_PREFER_HASH_JOIN;
 use datafusion::datasource::TableProvider;
 use datafusion::from_slice::FromSlice;
 use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection, TableScan};
@@ -178,8 +179,14 @@ fn create_case_context() -> Result<SessionContext> {
     Ok(ctx)
 }
 
-fn create_join_context(column_left: &str, column_right: &str) -> Result<SessionContext> {
-    let ctx = SessionContext::new();
+fn create_join_context(
+    column_left: &str,
+    column_right: &str,
+    repartition_joins: bool,
+) -> Result<SessionContext> {
+    let ctx = SessionContext::with_config(
+        SessionConfig::new().with_repartition_joins(repartition_joins),
+    );
 
     let t1_schema = Arc::new(Schema::new(vec![
         Field::new(column_left, DataType::UInt32, true),
@@ -427,6 +434,131 @@ fn create_join_context_with_nulls() -> Result<SessionContext> {
     Ok(ctx)
 }
 
+fn create_sort_merge_join_context(
+    column_left: &str,
+    column_right: &str,
+) -> Result<SessionContext> {
+    let ctx = SessionContext::with_config(
+        SessionConfig::new().set_bool(OPT_PREFER_HASH_JOIN, false),
+    );
+
+    let t1_schema = Arc::new(Schema::new(vec![
+        Field::new(column_left, DataType::UInt32, true),
+        Field::new("t1_name", DataType::Utf8, true),
+        Field::new("t1_int", DataType::UInt32, true),
+    ]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema,
+        vec![
+            Arc::new(UInt32Array::from_slice([11, 22, 33, 44])),
+            Arc::new(StringArray::from(vec![
+                Some("a"),
+                Some("b"),
+                Some("c"),
+                Some("d"),
+            ])),
+            Arc::new(UInt32Array::from_slice([1, 2, 3, 4])),
+        ],
+    )?;
+    ctx.register_batch("t1", t1_data)?;
+
+    let t2_schema = Arc::new(Schema::new(vec![
+        Field::new(column_right, DataType::UInt32, true),
+        Field::new("t2_name", DataType::Utf8, true),
+        Field::new("t2_int", DataType::UInt32, true),
+    ]));
+    let t2_data = RecordBatch::try_new(
+        t2_schema,
+        vec![
+            Arc::new(UInt32Array::from_slice([11, 22, 44, 55])),
+            Arc::new(StringArray::from(vec![
+                Some("z"),
+                Some("y"),
+                Some("x"),
+                Some("w"),
+            ])),
+            Arc::new(UInt32Array::from_slice([3, 1, 3, 3])),
+        ],
+    )?;
+    ctx.register_batch("t2", t2_data)?;
+
+    Ok(ctx)
+}
+
+fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
+    let ctx = SessionContext::with_config(
+        SessionConfig::new()
+            .set_bool(OPT_PREFER_HASH_JOIN, false)
+            .with_target_partitions(2),
+    );
+
+    let t1_schema = Schema::new(vec![
+        Field::new("c1", DataType::Date32, true),
+        Field::new("c2", DataType::Date64, true),
+        Field::new("c3", DataType::Decimal128(5, 2), true),
+        Field::new(
+            "c4",
+            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+            true,
+        ),
+    ]);
+    let dict1: DictionaryArray<Int32Type> =
+        vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
+    let t1_data = RecordBatch::try_new(
+        Arc::new(t1_schema),
+        vec![
+            Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
+            Arc::new(Date64Array::from(vec![
+                Some(86400000),
+                Some(172800000),
+                Some(259200000),
+                None,
+            ])),
+            Arc::new(
+                Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
+                    .with_precision_and_scale(5, 2)
+                    .unwrap(),
+            ),
+            Arc::new(dict1),
+        ],
+    )?;
+    ctx.register_batch("t1", t1_data)?;
+
+    let t2_schema = Schema::new(vec![
+        Field::new("c1", DataType::Date32, true),
+        Field::new("c2", DataType::Date64, true),
+        Field::new("c3", DataType::Decimal128(10, 2), true),
+        Field::new(
+            "c4",
+            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+            true,
+        ),
+    ]);
+    let dict2: DictionaryArray<Int32Type> =
+        vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect();
+    let t2_data = RecordBatch::try_new(
+        Arc::new(t2_schema),
+        vec![
+            Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
+            Arc::new(Date64Array::from(vec![
+                Some(86400000),
+                None,
+                Some(259200000),
+                None,
+            ])),
+            Arc::new(
+                Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
+                    .with_precision_and_scale(10, 2)
+                    .unwrap(),
+            ),
+            Arc::new(dict2),
+        ],
+    )?;
+    ctx.register_batch("t2", t2_data)?;
+
+    Ok(ctx)
+}
+
 fn get_tpch_table_schema(table: &str) -> Schema {
     match table {
         "customer" => Schema::new(vec![
diff --git a/datafusion/core/tests/sql/references.rs b/datafusion/core/tests/sql/references.rs
index e63acc444..52a82f071 100644
--- a/datafusion/core/tests/sql/references.rs
+++ b/datafusion/core/tests/sql/references.rs
@@ -122,7 +122,7 @@ async fn qualified_table_references_and_fields() -> Result<()> {
 
 #[tokio::test]
 async fn test_partial_qualified_name() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
     let sql = "SELECT t1.t1_id, t1_name FROM public.t1";
     let expected = vec![
         "+-------+---------+",
diff --git a/datafusion/core/tests/sql/wildcard.rs b/datafusion/core/tests/sql/wildcard.rs
index d78d0456d..ddf50dfa8 100644
--- a/datafusion/core/tests/sql/wildcard.rs
+++ b/datafusion/core/tests/sql/wildcard.rs
@@ -89,7 +89,7 @@ async fn select_non_alias_qualified_wildcard() -> Result<()> {
 
 #[tokio::test]
 async fn select_qualified_wildcard_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
     let sql =
         "SELECT tb1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id";
     let expected = vec![
@@ -111,7 +111,7 @@ async fn select_qualified_wildcard_join() -> Result<()> {
 
 #[tokio::test]
 async fn select_non_alias_qualified_wildcard_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
     let sql = "SELECT t1.*, tb2.* FROM t1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id";
     let expected = vec![
         "+-------+---------+--------+-------+---------+--------+",
diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md
index b8edeea87..51bc8e3ee 100644
--- a/datafusion/optimizer/README.md
+++ b/datafusion/optimizer/README.md
@@ -321,7 +321,7 @@ In the following example, the `type_coercion` and `simplify_expressions` passes
 |                                                            |   EmptyExec: produce_one_row=true                                         |
 |                                                            |                                                                           |
 | physical_plan after aggregate_statistics                   | SAME TEXT AS ABOVE                                                        |
-| physical_plan after hash_build_probe_order                 | SAME TEXT AS ABOVE                                                        |
+| physical_plan after join_selection                         | SAME TEXT AS ABOVE                                                        |
 | physical_plan after coalesce_batches                       | SAME TEXT AS ABOVE                                                        |
 | physical_plan after repartition                            | SAME TEXT AS ABOVE                                                        |
 | physical_plan after add_merge_exec                         | SAME TEXT AS ABOVE                                                        |
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 1b82037d0..a42050c75 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -35,20 +35,23 @@ Values are parsed according to the [same rules used in casts from Utf8](https://
 If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted.
 Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions.
 
-| key                                             | type    | default | description                                                                                                                                                                                                                                                                                                                                                   |
-| ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| datafusion.catalog.location                     | Utf8    | NULL    | Location scanned to load tables for `default` schema, defaults to None                                                                                                                                                                                                                                                                                        |
-| datafusion.catalog.type                         | Utf8    | NULL    | Type of `TableProvider` to use when loading `default` schema. Defaults to None                                                                                                                                                                                                                                                                                |
-| datafusion.execution.batch_size                 | UInt64  | 8192    | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption.                                                                                                                                                                         |
-| datafusion.execution.coalesce_batches           | Boolean | true    | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. |
-| datafusion.execution.coalesce_target_batch_size | UInt64  | 4096    | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'.                                                                                                                                                                                                                        |
-| datafusion.execution.parquet.enable_page_index  | Boolean | false   | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                                  |
-| datafusion.execution.parquet.pushdown_filters   | Boolean | false   | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded.                                                                                                                                                                                                                                        |
-| datafusion.execution.parquet.reorder_filters    | Boolean | false   | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query.                                                                                                                                 |
-| datafusion.execution.time_zone                  | Utf8    | +00:00  | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,                                                                                                                                                                                                                  |
-| then extract the hour.                          |
-| datafusion.explain.logical_plan_only            | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                                        |
-| datafusion.explain.physical_plan_only           | Boolean | false   | When set to true, the explain statement will only print physical plans.                                                                                                                                                                                                                                                                                       |
-| datafusion.optimizer.filter_null_join_keys      | Boolean | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                               |
-| datafusion.optimizer.max_passes                 | UInt64  | 3       | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                                                                          |
-| datafusion.optimizer.skip_failed_rules          | Boolean | true    | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail.                                                                                                                         |
+| key                                                       | type    | default | description                                                                                                                                                                                                                                                                                                                                                   |
+| --------------------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| datafusion.catalog.location                               | Utf8    | NULL    | Location scanned to load tables for `default` schema, defaults to None                                                                                                                                                                                                                                                                                        |
+| datafusion.catalog.type                                   | Utf8    | NULL    | Type of `TableProvider` to use when loading `default` schema. Defaults to None                                                                                                                                                                                                                                                                                |
+| datafusion.execution.batch_size                           | UInt64  | 8192    | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption.                                                                                                                                                                         |
+| datafusion.execution.coalesce_batches                     | Boolean | true    | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. |
+| datafusion.execution.coalesce_target_batch_size           | UInt64  | 4096    | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'.                                                                                                                                                                                                                        |
+| datafusion.execution.parquet.enable_page_index            | Boolean | false   | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                                  |
+| datafusion.execution.parquet.pushdown_filters             | Boolean | false   | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded.                                                                                                                                                                                                                                        |
+| datafusion.execution.parquet.reorder_filters              | Boolean | false   | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query.                                                                                                                                 |
+| datafusion.execution.time_zone                            | Utf8    | +00:00  | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,                                                                                                                                                                                                                  |
+| then extract the hour.                                    |
+| datafusion.explain.logical_plan_only                      | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                                        |
+| datafusion.explain.physical_plan_only                     | Boolean | false   | When set to true, the explain statement will only print physical plans.                                                                                                                                                                                                                                                                                       |
+| datafusion.optimizer.filter_null_join_keys                | Boolean | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                               |
+| datafusion.optimizer.hash_join_single_partition_threshold | UInt64  | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                |
+| datafusion.optimizer.max_passes                           | UInt64  | 3       | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                                                                          |
+| datafusion.optimizer.prefer_hash_join                     | Boolean | true    | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true                                                                                                                                                                        |
+| datafusion.optimizer.skip_failed_rules                    | Boolean | true    | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail.                                                                                                                         |
+| datafusion.optimizer.top_down_join_key_reordering         | Boolean | true    | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true                                                                                                                                                                                                                                          |