You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/11 15:10:56 UTC
(arrow-datafusion) branch main updated: Minor: Improve comments in EnforceDistribution tests (#8474)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1861c3d42b Minor: Improve comments in EnforceDistribution tests (#8474)
1861c3d42b is described below
commit 1861c3d42bbb0c4caa9c5a61c65065b87b32aa35
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Dec 11 10:10:50 2023 -0500
Minor: Improve comments in EnforceDistribution tests (#8474)
---
.../src/physical_optimizer/enforce_distribution.rs | 34 ++++++++++++++--------
1 file changed, 22 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 4befea741c..3aed6555f3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
-/// Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
+/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
///
/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
@@ -928,7 +928,7 @@ fn add_roundrobin_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
- // (determined by flag `config.optimizer.bounded_order_preserving_variants`)
+ // (determined by flag `config.optimizer.prefer_existing_sort`)
let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();
@@ -996,7 +996,7 @@ fn add_hash_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
- // `config.optimizer.bounded_order_preserving_variants`).
+ // `config.optimizer.prefer_existing_sort`).
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
@@ -1045,7 +1045,7 @@ fn add_spm_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
- // (determined by flag `config.optimizer.bounded_order_preserving_variants`)
+ // (determined by flag `config.optimizer.prefer_existing_sort`)
let should_preserve_ordering = input.output_ordering().is_some();
let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
@@ -2026,7 +2026,7 @@ pub(crate) mod tests {
fn ensure_distribution_helper(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
- bounded_order_preserving_variants: bool,
+ prefer_existing_sort: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
@@ -2034,7 +2034,7 @@ pub(crate) mod tests {
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
- config.optimizer.prefer_existing_sort = bounded_order_preserving_variants;
+ config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
}
@@ -2056,23 +2056,33 @@ pub(crate) mod tests {
}
/// Runs the repartition optimizer and asserts the plan against the expected
+ /// Arguments
+ /// * `EXPECTED_LINES` - Expected output plan
+ /// * `PLAN` - Input plan
+ /// * `FIRST_ENFORCE_DIST` -
+ /// true: (EnforceDistribution, EnforceDistribution, EnforceSorting)
+ /// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution)
+ /// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted
+ /// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
+ /// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
+ /// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
};
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => {
- assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024);
+ ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
+ assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
};
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
+ ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
- config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;
+ config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
@@ -3294,7 +3304,7 @@ pub(crate) mod tests {
];
assert_optimized!(expected, exec, true);
// In this case preserving ordering through order preserving operators is not desirable
- // (according to flag: bounded_order_preserving_variants)
+ // (according to flag: PREFER_EXISTING_SORT)
// hence in this case ordering lost during CoalescePartitionsExec and re-introduced with
// SortExec at the top.
let expected = &[
@@ -4341,7 +4351,7 @@ pub(crate) mod tests {
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
- // last flag sets config.optimizer.bounded_order_preserving_variants
+ // last flag sets config.optimizer.PREFER_EXISTING_SORT
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);