You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/20 16:35:06 UTC

(arrow-datafusion) branch main updated: Add hash_join_single_partition_threshold_rows config (#8720)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f5a97d58d4 Add hash_join_single_partition_threshold_rows config (#8720)
f5a97d58d4 is described below

commit f5a97d58d484e93c2f79c5b624b10bd9e75c45f0
Author: Eugene Marushchenko <ma...@gmail.com>
AuthorDate: Sun Jan 21 02:35:00 2024 +1000

    Add hash_join_single_partition_threshold_rows config (#8720)
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/common/src/config.rs                    |   4 +
 .../core/src/physical_optimizer/join_selection.rs  | 284 ++++++++++++---------
 .../sqllogictest/test_files/information_schema.slt |   2 +
 docs/source/user-guide/configs.md                  |   1 +
 4 files changed, 165 insertions(+), 126 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index e00c179308..eb516f97a4 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -561,6 +561,10 @@ config_namespace! {
         /// will be collected into a single partition
         pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
 
+        /// The maximum estimated size in rows for one input side of a HashJoin
+        /// will be collected into a single partition
+        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
+
         /// The default filter selectivity used by Filter Statistics
         /// when an exact selectivity cannot be determined. Valid values are
         /// between 0 (no selectivity) and 100 (all rows are selected).
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index ba66dca55b..f9b9fdf85c 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -87,9 +87,10 @@ fn should_swap_join_order(
     }
 }
 
-fn supports_collect_by_size(
+fn supports_collect_by_thresholds(
     plan: &dyn ExecutionPlan,
-    collection_size_threshold: usize,
+    threshold_byte_size: usize,
+    threshold_num_rows: usize,
 ) -> bool {
     // Currently we do not trust the 0 value from stats, due to stats collection might have bug
     // TODO check the logic in datasource::get_statistics_with_limit()
@@ -97,10 +98,10 @@ fn supports_collect_by_size(
         return false;
     };
 
-    if let Some(size) = stats.total_byte_size.get_value() {
-        *size != 0 && *size < collection_size_threshold
-    } else if let Some(row_count) = stats.num_rows.get_value() {
-        *row_count != 0 && *row_count < collection_size_threshold
+    if let Some(byte_size) = stats.total_byte_size.get_value() {
+        *byte_size != 0 && *byte_size < threshold_byte_size
+    } else if let Some(num_rows) = stats.num_rows.get_value() {
+        *num_rows != 0 && *num_rows < threshold_num_rows
     } else {
         false
     }
@@ -251,9 +252,14 @@ impl PhysicalOptimizerRule for JoinSelection {
         // - We will also swap left and right sides for cross joins so that the left
         //   side is the small side.
         let config = &config.optimizer;
-        let collect_left_threshold = config.hash_join_single_partition_threshold;
+        let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
+        let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
         state.plan.transform_up(&|plan| {
-            statistical_join_selection_subrule(plan, collect_left_threshold)
+            statistical_join_selection_subrule(
+                plan,
+                collect_threshold_byte_size,
+                collect_threshold_num_rows,
+            )
         })
     }
 
@@ -270,8 +276,8 @@ impl PhysicalOptimizerRule for JoinSelection {
 ///
 /// This function will first consider the given join type and check whether the
 /// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
-/// When the `collect_threshold` is provided, this function will also check left
-/// and right sizes.
+/// When the `ignore_threshold` is false, this function will also check left
+/// and right sizes in bytes or rows.
 ///
 /// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`.
 /// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft`
@@ -279,7 +285,9 @@ impl PhysicalOptimizerRule for JoinSelection {
 /// and [`JoinType::RightAnti`], respectively.
 fn try_collect_left(
     hash_join: &HashJoinExec,
-    collect_threshold: Option<usize>,
+    ignore_threshold: bool,
+    threshold_byte_size: usize,
+    threshold_num_rows: usize,
 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
     let left = hash_join.left();
     let right = hash_join.right();
@@ -291,9 +299,14 @@ fn try_collect_left(
         | JoinType::LeftSemi
         | JoinType::Right
         | JoinType::RightSemi
-        | JoinType::RightAnti => collect_threshold.map_or(true, |threshold| {
-            supports_collect_by_size(&**left, threshold)
-        }),
+        | JoinType::RightAnti => {
+            ignore_threshold
+                || supports_collect_by_thresholds(
+                    &**left,
+                    threshold_byte_size,
+                    threshold_num_rows,
+                )
+        }
     };
     let right_can_collect = match join_type {
         JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
@@ -301,9 +314,14 @@ fn try_collect_left(
         | JoinType::RightSemi
         | JoinType::Left
         | JoinType::LeftSemi
-        | JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| {
-            supports_collect_by_size(&**right, threshold)
-        }),
+        | JoinType::LeftAnti => {
+            ignore_threshold
+                || supports_collect_by_thresholds(
+                    &**right,
+                    threshold_byte_size,
+                    threshold_num_rows,
+                )
+        }
     };
     match (left_can_collect, right_can_collect) {
         (true, true) => {
@@ -366,52 +384,56 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
 /// optimize hash and cross joins in the plan according to available statistical information.
 fn statistical_join_selection_subrule(
     plan: Arc<dyn ExecutionPlan>,
-    collect_left_threshold: usize,
+    collect_threshold_byte_size: usize,
+    collect_threshold_num_rows: usize,
 ) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
-    let transformed = if let Some(hash_join) =
-        plan.as_any().downcast_ref::<HashJoinExec>()
-    {
-        match hash_join.partition_mode() {
-            PartitionMode::Auto => {
-                try_collect_left(hash_join, Some(collect_left_threshold))?.map_or_else(
-                    || partitioned_hash_join(hash_join).map(Some),
-                    |v| Ok(Some(v)),
+    let transformed =
+        if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+            match hash_join.partition_mode() {
+                PartitionMode::Auto => try_collect_left(
+                    hash_join,
+                    false,
+                    collect_threshold_byte_size,
+                    collect_threshold_num_rows,
                 )?
-            }
-            PartitionMode::CollectLeft => try_collect_left(hash_join, None)?
                 .map_or_else(
                     || partitioned_hash_join(hash_join).map(Some),
                     |v| Ok(Some(v)),
                 )?,
-            PartitionMode::Partitioned => {
-                let left = hash_join.left();
-                let right = hash_join.right();
-                if should_swap_join_order(&**left, &**right)?
-                    && supports_swap(*hash_join.join_type())
-                {
-                    swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)?
-                } else {
-                    None
+                PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
+                    .map_or_else(
+                        || partitioned_hash_join(hash_join).map(Some),
+                        |v| Ok(Some(v)),
+                    )?,
+                PartitionMode::Partitioned => {
+                    let left = hash_join.left();
+                    let right = hash_join.right();
+                    if should_swap_join_order(&**left, &**right)?
+                        && supports_swap(*hash_join.join_type())
+                    {
+                        swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)?
+                    } else {
+                        None
+                    }
                 }
             }
-        }
-    } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
-        let left = cross_join.left();
-        let right = cross_join.right();
-        if should_swap_join_order(&**left, &**right)? {
-            let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
-            // TODO avoid adding ProjectionExec again and again, only adding Final Projection
-            let proj: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
-                swap_reverting_projection(&left.schema(), &right.schema()),
-                Arc::new(new_join),
-            )?);
-            Some(proj)
+        } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
+            let left = cross_join.left();
+            let right = cross_join.right();
+            if should_swap_join_order(&**left, &**right)? {
+                let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
+                // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+                let proj: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
+                    swap_reverting_projection(&left.schema(), &right.schema()),
+                    Arc::new(new_join),
+                )?);
+                Some(proj)
+            } else {
+                None
+            }
         } else {
             None
-        }
-    } else {
-        None
-    };
+        };
 
     Ok(if let Some(transformed) = transformed {
         Transformed::Yes(transformed)
@@ -682,22 +704,62 @@ mod tests_statistical {
     use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_expr::PhysicalExpr;
 
+    /// Return statistcs for empty table
+    fn empty_statistics() -> Statistics {
+        Statistics {
+            num_rows: Precision::Absent,
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![ColumnStatistics::new_unknown()],
+        }
+    }
+
+    /// Get table thresholds: (num_rows, byte_size)
+    fn get_thresholds() -> (usize, usize) {
+        let optimizer_options = ConfigOptions::new().optimizer;
+        (
+            optimizer_options.hash_join_single_partition_threshold_rows,
+            optimizer_options.hash_join_single_partition_threshold,
+        )
+    }
+
+    /// Return statistcs for small table
+    fn small_statistics() -> Statistics {
+        let (threshold_num_rows, threshold_byte_size) = get_thresholds();
+        Statistics {
+            num_rows: Precision::Inexact(threshold_num_rows / 128),
+            total_byte_size: Precision::Inexact(threshold_byte_size / 128),
+            column_statistics: vec![ColumnStatistics::new_unknown()],
+        }
+    }
+
+    /// Return statistcs for big table
+    fn big_statistics() -> Statistics {
+        let (threshold_num_rows, threshold_byte_size) = get_thresholds();
+        Statistics {
+            num_rows: Precision::Inexact(threshold_num_rows * 2),
+            total_byte_size: Precision::Inexact(threshold_byte_size * 2),
+            column_statistics: vec![ColumnStatistics::new_unknown()],
+        }
+    }
+
+    /// Return statistcs for big table
+    fn bigger_statistics() -> Statistics {
+        let (threshold_num_rows, threshold_byte_size) = get_thresholds();
+        Statistics {
+            num_rows: Precision::Inexact(threshold_num_rows * 4),
+            total_byte_size: Precision::Inexact(threshold_byte_size * 4),
+            column_statistics: vec![ColumnStatistics::new_unknown()],
+        }
+    }
+
     fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
         let big = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Inexact(10),
-                total_byte_size: Precision::Inexact(100000),
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
+            big_statistics(),
             Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
         ));
 
         let small = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Inexact(100000),
-                total_byte_size: Precision::Inexact(10),
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
+            small_statistics(),
             Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
         ));
         (big, small)
@@ -821,11 +883,11 @@ mod tests_statistical {
 
         assert_eq!(
             swapped_join.left().statistics().unwrap().total_byte_size,
-            Precision::Inexact(10)
+            Precision::Inexact(8192)
         );
         assert_eq!(
             swapped_join.right().statistics().unwrap().total_byte_size,
-            Precision::Inexact(100000)
+            Precision::Inexact(2097152)
         );
     }
 
@@ -872,11 +934,11 @@ mod tests_statistical {
 
         assert_eq!(
             swapped_join.left().statistics().unwrap().total_byte_size,
-            Precision::Inexact(100000)
+            Precision::Inexact(2097152)
         );
         assert_eq!(
             swapped_join.right().statistics().unwrap().total_byte_size,
-            Precision::Inexact(10)
+            Precision::Inexact(8192)
         );
     }
 
@@ -917,11 +979,11 @@ mod tests_statistical {
 
             assert_eq!(
                 swapped_join.left().statistics().unwrap().total_byte_size,
-                Precision::Inexact(10)
+                Precision::Inexact(8192)
             );
             assert_eq!(
                 swapped_join.right().statistics().unwrap().total_byte_size,
-                Precision::Inexact(100000)
+                Precision::Inexact(2097152)
             );
 
             assert_eq!(original_schema, swapped_join.schema());
@@ -1032,11 +1094,11 @@ mod tests_statistical {
 
         assert_eq!(
             swapped_join.left().statistics().unwrap().total_byte_size,
-            Precision::Inexact(10)
+            Precision::Inexact(8192)
         );
         assert_eq!(
             swapped_join.right().statistics().unwrap().total_byte_size,
-            Precision::Inexact(100000)
+            Precision::Inexact(2097152)
         );
     }
 
@@ -1078,29 +1140,17 @@ mod tests_statistical {
     #[tokio::test]
     async fn test_join_selection_collect_left() {
         let big = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Inexact(10000000),
-                total_byte_size: Precision::Inexact(10000000),
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
+            big_statistics(),
             Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
         ));
 
         let small = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Inexact(10),
-                total_byte_size: Precision::Inexact(10),
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
+            small_statistics(),
             Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
         ));
 
         let empty = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Absent,
-                total_byte_size: Precision::Absent,
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
+            empty_statistics(),
             Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]),
         ));
 
@@ -1121,7 +1171,7 @@ mod tests_statistical {
             Column::new_with_schema("small_col", &small.schema()).unwrap(),
         )];
         check_join_partition_mode(
-            big,
+            big.clone(),
             small.clone(),
             join_on,
             true,
@@ -1145,8 +1195,8 @@ mod tests_statistical {
             Column::new_with_schema("small_col", &small.schema()).unwrap(),
         )];
         check_join_partition_mode(
-            empty,
-            small,
+            empty.clone(),
+            small.clone(),
             join_on,
             true,
             PartitionMode::CollectLeft,
@@ -1155,52 +1205,40 @@ mod tests_statistical {
 
     #[tokio::test]
     async fn test_join_selection_partitioned() {
-        let big1 = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Inexact(10000000),
-                total_byte_size: Precision::Inexact(10000000),
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
-            Schema::new(vec![Field::new("big_col1", DataType::Int32, false)]),
+        let bigger = Arc::new(StatisticsExec::new(
+            bigger_statistics(),
+            Schema::new(vec![Field::new("bigger_col", DataType::Int32, false)]),
         ));
 
-        let big2 = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Inexact(20000000),
-                total_byte_size: Precision::Inexact(20000000),
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
-            Schema::new(vec![Field::new("big_col2", DataType::Int32, false)]),
+        let big = Arc::new(StatisticsExec::new(
+            big_statistics(),
+            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
         ));
 
         let empty = Arc::new(StatisticsExec::new(
-            Statistics {
-                num_rows: Precision::Absent,
-                total_byte_size: Precision::Absent,
-                column_statistics: vec![ColumnStatistics::new_unknown()],
-            },
+            empty_statistics(),
             Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]),
         ));
 
         let join_on = vec![(
-            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
-            Column::new_with_schema("big_col2", &big2.schema()).unwrap(),
+            Column::new_with_schema("big_col", &big.schema()).unwrap(),
+            Column::new_with_schema("bigger_col", &bigger.schema()).unwrap(),
         )];
         check_join_partition_mode(
-            big1.clone(),
-            big2.clone(),
+            big.clone(),
+            bigger.clone(),
             join_on,
             false,
             PartitionMode::Partitioned,
         );
 
         let join_on = vec![(
-            Column::new_with_schema("big_col2", &big2.schema()).unwrap(),
-            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+            Column::new_with_schema("bigger_col", &bigger.schema()).unwrap(),
+            Column::new_with_schema("big_col", &big.schema()).unwrap(),
         )];
         check_join_partition_mode(
-            big2,
-            big1.clone(),
+            bigger.clone(),
+            big.clone(),
             join_on,
             true,
             PartitionMode::Partitioned,
@@ -1208,27 +1246,21 @@ mod tests_statistical {
 
         let join_on = vec![(
             Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
-            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+            Column::new_with_schema("big_col", &big.schema()).unwrap(),
         )];
         check_join_partition_mode(
             empty.clone(),
-            big1.clone(),
+            big.clone(),
             join_on,
             false,
             PartitionMode::Partitioned,
         );
 
         let join_on = vec![(
-            Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+            Column::new_with_schema("big_col", &big.schema()).unwrap(),
             Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
         )];
-        check_join_partition_mode(
-            big1,
-            empty,
-            join_on,
-            false,
-            PartitionMode::Partitioned,
-        );
+        check_join_partition_mode(big, empty, join_on, false, PartitionMode::Partitioned);
     }
 
     fn check_join_partition_mode(
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt
index b37b78ab6d..768292d3d4 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -196,6 +196,7 @@ datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.enable_topk_aggregation true
 datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
+datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
 datafusion.optimizer.max_passes 3
 datafusion.optimizer.prefer_existing_sort false
 datafusion.optimizer.prefer_hash_join true
@@ -272,6 +273,7 @@ datafusion.optimizer.enable_round_robin_repartition true When set to true, the p
 datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
 datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
 datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
+datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
 datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
 datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec`  and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
 datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index a812b74284..7a7460799b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 | datafusion.optimizer.top_down_join_key_reordering                       | true                      | When set to true, the physical plan optimizer will run a top down process to reorder the join keys                                                                                                                                                                                                                                                                                                   [...]
 | datafusion.optimizer.prefer_hash_join                                   | true                      | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory                                                                                                                                                                                                                                [...]
 | datafusion.optimizer.hash_join_single_partition_threshold               | 1048576                   | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                                                       [...]
+| datafusion.optimizer.hash_join_single_partition_threshold_rows          | 131072                    | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                                                        [...]
 | datafusion.optimizer.default_filter_selectivity                         | 20                        | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).                                                                                                                                                                                                                [...]
 | datafusion.explain.logical_plan_only                                    | false                     | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                                                                                                                [...]
 | datafusion.explain.physical_plan_only                                   | false                     | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                                                                                                               [...]