You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/20 16:35:06 UTC
(arrow-datafusion) branch main updated: Add hash_join_single_partition_threshold_rows config (#8720)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f5a97d58d4 Add hash_join_single_partition_threshold_rows config (#8720)
f5a97d58d4 is described below
commit f5a97d58d484e93c2f79c5b624b10bd9e75c45f0
Author: Eugene Marushchenko <ma...@gmail.com>
AuthorDate: Sun Jan 21 02:35:00 2024 +1000
Add hash_join_single_partition_threshold_rows config (#8720)
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/common/src/config.rs | 4 +
.../core/src/physical_optimizer/join_selection.rs | 284 ++++++++++++---------
.../sqllogictest/test_files/information_schema.slt | 2 +
docs/source/user-guide/configs.md | 1 +
4 files changed, 165 insertions(+), 126 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index e00c179308..eb516f97a4 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -561,6 +561,10 @@ config_namespace! {
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
+ /// The maximum estimated size in rows for one input side of a HashJoin
+ /// will be collected into a single partition
+ pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
+
/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index ba66dca55b..f9b9fdf85c 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -87,9 +87,10 @@ fn should_swap_join_order(
}
}
-fn supports_collect_by_size(
+fn supports_collect_by_thresholds(
plan: &dyn ExecutionPlan,
- collection_size_threshold: usize,
+ threshold_byte_size: usize,
+ threshold_num_rows: usize,
) -> bool {
// Currently we do not trust the 0 value from stats, due to stats collection might have bug
// TODO check the logic in datasource::get_statistics_with_limit()
@@ -97,10 +98,10 @@ fn supports_collect_by_size(
return false;
};
- if let Some(size) = stats.total_byte_size.get_value() {
- *size != 0 && *size < collection_size_threshold
- } else if let Some(row_count) = stats.num_rows.get_value() {
- *row_count != 0 && *row_count < collection_size_threshold
+ if let Some(byte_size) = stats.total_byte_size.get_value() {
+ *byte_size != 0 && *byte_size < threshold_byte_size
+ } else if let Some(num_rows) = stats.num_rows.get_value() {
+ *num_rows != 0 && *num_rows < threshold_num_rows
} else {
false
}
@@ -251,9 +252,14 @@ impl PhysicalOptimizerRule for JoinSelection {
// - We will also swap left and right sides for cross joins so that the left
// side is the small side.
let config = &config.optimizer;
- let collect_left_threshold = config.hash_join_single_partition_threshold;
+ let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
+ let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
state.plan.transform_up(&|plan| {
- statistical_join_selection_subrule(plan, collect_left_threshold)
+ statistical_join_selection_subrule(
+ plan,
+ collect_threshold_byte_size,
+ collect_threshold_num_rows,
+ )
})
}
@@ -270,8 +276,8 @@ impl PhysicalOptimizerRule for JoinSelection {
///
/// This function will first consider the given join type and check whether the
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
-/// When the `collect_threshold` is provided, this function will also check left
-/// and right sizes.
+/// When the `ignore_threshold` is false, this function will also check left
+/// and right sizes in bytes or rows.
///
/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`.
/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft`
@@ -279,7 +285,9 @@ impl PhysicalOptimizerRule for JoinSelection {
/// and [`JoinType::RightAnti`], respectively.
fn try_collect_left(
hash_join: &HashJoinExec,
- collect_threshold: Option<usize>,
+ ignore_threshold: bool,
+ threshold_byte_size: usize,
+ threshold_num_rows: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let left = hash_join.left();
let right = hash_join.right();
@@ -291,9 +299,14 @@ fn try_collect_left(
| JoinType::LeftSemi
| JoinType::Right
| JoinType::RightSemi
- | JoinType::RightAnti => collect_threshold.map_or(true, |threshold| {
- supports_collect_by_size(&**left, threshold)
- }),
+ | JoinType::RightAnti => {
+ ignore_threshold
+ || supports_collect_by_thresholds(
+ &**left,
+ threshold_byte_size,
+ threshold_num_rows,
+ )
+ }
};
let right_can_collect = match join_type {
JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
@@ -301,9 +314,14 @@ fn try_collect_left(
| JoinType::RightSemi
| JoinType::Left
| JoinType::LeftSemi
- | JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| {
- supports_collect_by_size(&**right, threshold)
- }),
+ | JoinType::LeftAnti => {
+ ignore_threshold
+ || supports_collect_by_thresholds(
+ &**right,
+ threshold_byte_size,
+ threshold_num_rows,
+ )
+ }
};
match (left_can_collect, right_can_collect) {
(true, true) => {
@@ -366,52 +384,56 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
/// optimize hash and cross joins in the plan according to available statistical information.
fn statistical_join_selection_subrule(
plan: Arc<dyn ExecutionPlan>,
- collect_left_threshold: usize,
+ collect_threshold_byte_size: usize,
+ collect_threshold_num_rows: usize,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
- let transformed = if let Some(hash_join) =
- plan.as_any().downcast_ref::<HashJoinExec>()
- {
- match hash_join.partition_mode() {
- PartitionMode::Auto => {
- try_collect_left(hash_join, Some(collect_left_threshold))?.map_or_else(
- || partitioned_hash_join(hash_join).map(Some),
- |v| Ok(Some(v)),
+ let transformed =
+ if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+ match hash_join.partition_mode() {
+ PartitionMode::Auto => try_collect_left(
+ hash_join,
+ false,
+ collect_threshold_byte_size,
+ collect_threshold_num_rows,
)?
- }
- PartitionMode::CollectLeft => try_collect_left(hash_join, None)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
- PartitionMode::Partitioned => {
- let left = hash_join.left();
- let right = hash_join.right();
- if should_swap_join_order(&**left, &**right)?
- && supports_swap(*hash_join.join_type())
- {
- swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)?
- } else {
- None
+ PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
+ .map_or_else(
+ || partitioned_hash_join(hash_join).map(Some),
+ |v| Ok(Some(v)),
+ )?,
+ PartitionMode::Partitioned => {
+ let left = hash_join.left();
+ let right = hash_join.right();
+ if should_swap_join_order(&**left, &**right)?
+ && supports_swap(*hash_join.join_type())
+ {
+ swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)?
+ } else {
+ None
+ }
}
}
- }
- } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
- let left = cross_join.left();
- let right = cross_join.right();
- if should_swap_join_order(&**left, &**right)? {
- let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
- // TODO avoid adding ProjectionExec again and again, only adding Final Projection
- let proj: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
- swap_reverting_projection(&left.schema(), &right.schema()),
- Arc::new(new_join),
- )?);
- Some(proj)
+ } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
+ let left = cross_join.left();
+ let right = cross_join.right();
+ if should_swap_join_order(&**left, &**right)? {
+ let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
+ // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+ let proj: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
+ swap_reverting_projection(&left.schema(), &right.schema()),
+ Arc::new(new_join),
+ )?);
+ Some(proj)
+ } else {
+ None
+ }
} else {
None
- }
- } else {
- None
- };
+ };
Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
@@ -682,22 +704,62 @@ mod tests_statistical {
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
+ /// Return statistcs for empty table
+ fn empty_statistics() -> Statistics {
+ Statistics {
+ num_rows: Precision::Absent,
+ total_byte_size: Precision::Absent,
+ column_statistics: vec![ColumnStatistics::new_unknown()],
+ }
+ }
+
+ /// Get table thresholds: (num_rows, byte_size)
+ fn get_thresholds() -> (usize, usize) {
+ let optimizer_options = ConfigOptions::new().optimizer;
+ (
+ optimizer_options.hash_join_single_partition_threshold_rows,
+ optimizer_options.hash_join_single_partition_threshold,
+ )
+ }
+
+ /// Return statistcs for small table
+ fn small_statistics() -> Statistics {
+ let (threshold_num_rows, threshold_byte_size) = get_thresholds();
+ Statistics {
+ num_rows: Precision::Inexact(threshold_num_rows / 128),
+ total_byte_size: Precision::Inexact(threshold_byte_size / 128),
+ column_statistics: vec![ColumnStatistics::new_unknown()],
+ }
+ }
+
+ /// Return statistcs for big table
+ fn big_statistics() -> Statistics {
+ let (threshold_num_rows, threshold_byte_size) = get_thresholds();
+ Statistics {
+ num_rows: Precision::Inexact(threshold_num_rows * 2),
+ total_byte_size: Precision::Inexact(threshold_byte_size * 2),
+ column_statistics: vec![ColumnStatistics::new_unknown()],
+ }
+ }
+
+ /// Return statistcs for big table
+ fn bigger_statistics() -> Statistics {
+ let (threshold_num_rows, threshold_byte_size) = get_thresholds();
+ Statistics {
+ num_rows: Precision::Inexact(threshold_num_rows * 4),
+ total_byte_size: Precision::Inexact(threshold_byte_size * 4),
+ column_statistics: vec![ColumnStatistics::new_unknown()],
+ }
+ }
+
fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Inexact(10),
- total_byte_size: Precision::Inexact(100000),
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
+ big_statistics(),
Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
));
let small = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Inexact(100000),
- total_byte_size: Precision::Inexact(10),
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
+ small_statistics(),
Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
));
(big, small)
@@ -821,11 +883,11 @@ mod tests_statistical {
assert_eq!(
swapped_join.left().statistics().unwrap().total_byte_size,
- Precision::Inexact(10)
+ Precision::Inexact(8192)
);
assert_eq!(
swapped_join.right().statistics().unwrap().total_byte_size,
- Precision::Inexact(100000)
+ Precision::Inexact(2097152)
);
}
@@ -872,11 +934,11 @@ mod tests_statistical {
assert_eq!(
swapped_join.left().statistics().unwrap().total_byte_size,
- Precision::Inexact(100000)
+ Precision::Inexact(2097152)
);
assert_eq!(
swapped_join.right().statistics().unwrap().total_byte_size,
- Precision::Inexact(10)
+ Precision::Inexact(8192)
);
}
@@ -917,11 +979,11 @@ mod tests_statistical {
assert_eq!(
swapped_join.left().statistics().unwrap().total_byte_size,
- Precision::Inexact(10)
+ Precision::Inexact(8192)
);
assert_eq!(
swapped_join.right().statistics().unwrap().total_byte_size,
- Precision::Inexact(100000)
+ Precision::Inexact(2097152)
);
assert_eq!(original_schema, swapped_join.schema());
@@ -1032,11 +1094,11 @@ mod tests_statistical {
assert_eq!(
swapped_join.left().statistics().unwrap().total_byte_size,
- Precision::Inexact(10)
+ Precision::Inexact(8192)
);
assert_eq!(
swapped_join.right().statistics().unwrap().total_byte_size,
- Precision::Inexact(100000)
+ Precision::Inexact(2097152)
);
}
@@ -1078,29 +1140,17 @@ mod tests_statistical {
#[tokio::test]
async fn test_join_selection_collect_left() {
let big = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Inexact(10000000),
- total_byte_size: Precision::Inexact(10000000),
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
+ big_statistics(),
Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
));
let small = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Inexact(10),
- total_byte_size: Precision::Inexact(10),
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
+ small_statistics(),
Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
));
let empty = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Absent,
- total_byte_size: Precision::Absent,
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
+ empty_statistics(),
Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]),
));
@@ -1121,7 +1171,7 @@ mod tests_statistical {
Column::new_with_schema("small_col", &small.schema()).unwrap(),
)];
check_join_partition_mode(
- big,
+ big.clone(),
small.clone(),
join_on,
true,
@@ -1145,8 +1195,8 @@ mod tests_statistical {
Column::new_with_schema("small_col", &small.schema()).unwrap(),
)];
check_join_partition_mode(
- empty,
- small,
+ empty.clone(),
+ small.clone(),
join_on,
true,
PartitionMode::CollectLeft,
@@ -1155,52 +1205,40 @@ mod tests_statistical {
#[tokio::test]
async fn test_join_selection_partitioned() {
- let big1 = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Inexact(10000000),
- total_byte_size: Precision::Inexact(10000000),
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
- Schema::new(vec![Field::new("big_col1", DataType::Int32, false)]),
+ let bigger = Arc::new(StatisticsExec::new(
+ bigger_statistics(),
+ Schema::new(vec![Field::new("bigger_col", DataType::Int32, false)]),
));
- let big2 = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Inexact(20000000),
- total_byte_size: Precision::Inexact(20000000),
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
- Schema::new(vec![Field::new("big_col2", DataType::Int32, false)]),
+ let big = Arc::new(StatisticsExec::new(
+ big_statistics(),
+ Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
));
let empty = Arc::new(StatisticsExec::new(
- Statistics {
- num_rows: Precision::Absent,
- total_byte_size: Precision::Absent,
- column_statistics: vec![ColumnStatistics::new_unknown()],
- },
+ empty_statistics(),
Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]),
));
let join_on = vec![(
- Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
- Column::new_with_schema("big_col2", &big2.schema()).unwrap(),
+ Column::new_with_schema("big_col", &big.schema()).unwrap(),
+ Column::new_with_schema("bigger_col", &bigger.schema()).unwrap(),
)];
check_join_partition_mode(
- big1.clone(),
- big2.clone(),
+ big.clone(),
+ bigger.clone(),
join_on,
false,
PartitionMode::Partitioned,
);
let join_on = vec![(
- Column::new_with_schema("big_col2", &big2.schema()).unwrap(),
- Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+ Column::new_with_schema("bigger_col", &bigger.schema()).unwrap(),
+ Column::new_with_schema("big_col", &big.schema()).unwrap(),
)];
check_join_partition_mode(
- big2,
- big1.clone(),
+ bigger.clone(),
+ big.clone(),
join_on,
true,
PartitionMode::Partitioned,
@@ -1208,27 +1246,21 @@ mod tests_statistical {
let join_on = vec![(
Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
- Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+ Column::new_with_schema("big_col", &big.schema()).unwrap(),
)];
check_join_partition_mode(
empty.clone(),
- big1.clone(),
+ big.clone(),
join_on,
false,
PartitionMode::Partitioned,
);
let join_on = vec![(
- Column::new_with_schema("big_col1", &big1.schema()).unwrap(),
+ Column::new_with_schema("big_col", &big.schema()).unwrap(),
Column::new_with_schema("empty_col", &empty.schema()).unwrap(),
)];
- check_join_partition_mode(
- big1,
- empty,
- join_on,
- false,
- PartitionMode::Partitioned,
- );
+ check_join_partition_mode(big, empty, join_on, false, PartitionMode::Partitioned);
}
fn check_join_partition_mode(
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt
index b37b78ab6d..768292d3d4 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -196,6 +196,7 @@ datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
+datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_hash_join true
@@ -272,6 +273,7 @@ datafusion.optimizer.enable_round_robin_repartition true When set to true, the p
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
+datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index a812b74284..7a7460799b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys [...]
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory [...]
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition [...]
+| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition [...]
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). [...]
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans [...]
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans [...]