You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/11 15:10:56 UTC

(arrow-datafusion) branch main updated: Minor: Improve comments in EnforceDistribution tests (#8474)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1861c3d42b Minor: Improve comments in EnforceDistribution tests (#8474)
1861c3d42b is described below

commit 1861c3d42bbb0c4caa9c5a61c65065b87b32aa35
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Dec 11 10:10:50 2023 -0500

    Minor: Improve comments in EnforceDistribution tests (#8474)
---
 .../src/physical_optimizer/enforce_distribution.rs | 34 ++++++++++++++--------
 1 file changed, 22 insertions(+), 12 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 4befea741c..3aed6555f3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
 /// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
 ///    Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
 ///    Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
-///    Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
+///    Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
 ///
 /// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
 ///    Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
@@ -928,7 +928,7 @@ fn add_roundrobin_on_top(
         // If any of the following conditions is true
         // - Preserving ordering is not helpful in terms of satisfying ordering requirements
         // - Usage of order preserving variants is not desirable
-        // (determined by flag `config.optimizer.bounded_order_preserving_variants`)
+        // (determined by flag `config.optimizer.prefer_existing_sort`)
         let partitioning = Partitioning::RoundRobinBatch(n_target);
         let repartition =
             RepartitionExec::try_new(input, partitioning)?.with_preserve_order();
@@ -996,7 +996,7 @@ fn add_hash_on_top(
         // - Preserving ordering is not helpful in terms of satisfying ordering
         //   requirements.
         // - Usage of order preserving variants is not desirable (per the flag
-        //   `config.optimizer.bounded_order_preserving_variants`).
+        //   `config.optimizer.prefer_existing_sort`).
         let mut new_plan = if repartition_beneficial_stats {
             // Since hashing benefits from partitioning, add a round-robin repartition
             // before it:
@@ -1045,7 +1045,7 @@ fn add_spm_on_top(
         // If any of the following conditions is true
         // - Preserving ordering is not helpful in terms of satisfying ordering requirements
         // - Usage of order preserving variants is not desirable
-        // (determined by flag `config.optimizer.bounded_order_preserving_variants`)
+        // (determined by flag `config.optimizer.prefer_existing_sort`)
         let should_preserve_ordering = input.output_ordering().is_some();
         let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
             let existing_ordering = input.output_ordering().unwrap_or(&[]);
@@ -2026,7 +2026,7 @@ pub(crate) mod tests {
     fn ensure_distribution_helper(
         plan: Arc<dyn ExecutionPlan>,
         target_partitions: usize,
-        bounded_order_preserving_variants: bool,
+        prefer_existing_sort: bool,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let distribution_context = DistributionContext::new(plan);
         let mut config = ConfigOptions::new();
@@ -2034,7 +2034,7 @@ pub(crate) mod tests {
         config.optimizer.enable_round_robin_repartition = false;
         config.optimizer.repartition_file_scans = false;
         config.optimizer.repartition_file_min_size = 1024;
-        config.optimizer.prefer_existing_sort = bounded_order_preserving_variants;
+        config.optimizer.prefer_existing_sort = prefer_existing_sort;
         ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
     }
 
@@ -2056,23 +2056,33 @@ pub(crate) mod tests {
     }
 
     /// Runs the repartition optimizer and asserts the plan against the expected
+    /// Arguments
+    /// * `EXPECTED_LINES` - Expected output plan
+    /// * `PLAN` - Input plan
+    /// * `FIRST_ENFORCE_DIST` -
+    ///     true: (EnforceDistribution, EnforceDistribution,  EnforceSorting)
+    ///     false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution)
+    /// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted
+    /// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
+    /// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
+    /// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
     macro_rules! assert_optimized {
         ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
             assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
         };
 
-        ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => {
-            assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024);
+        ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
+            assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
         };
 
-        ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
+        ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
             let mut config = ConfigOptions::new();
             config.execution.target_partitions = $TARGET_PARTITIONS;
             config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
             config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
-            config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;
+            config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
 
             // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
             //       because they were written prior to the separation of `BasicEnforcement` into
@@ -3294,7 +3304,7 @@ pub(crate) mod tests {
         ];
         assert_optimized!(expected, exec, true);
         // In this case preserving ordering through order preserving operators is not desirable
-        // (according to flag: bounded_order_preserving_variants)
+        // (according to flag: PREFER_EXISTING_SORT)
         // hence in this case ordering lost during CoalescePartitionsExec and re-introduced with
         // SortExec at the top.
         let expected = &[
@@ -4341,7 +4351,7 @@ pub(crate) mod tests {
             "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
         ];
 
-        // last flag sets config.optimizer.bounded_order_preserving_variants
+        // last flag sets config.optimizer.PREFER_EXISTING_SORT
         assert_optimized!(expected, physical_plan.clone(), true, true);
         assert_optimized!(expected, physical_plan, false, true);