You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/02/03 09:37:24 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

mustafasrepo opened a new pull request, #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes [#5100](https://github.com/apache/arrow-datafusion/issues/5100).
   
   # Rationale for this change
   
   <!--
   Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
   Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
   -->
   We want to have the ability to leverage parallelism in sorts when appropriate, which requires `EnforceSorting` to be partition aware. We also want to further decouple `EnforceSorting` and `EnforceDistribution` rules, with the end goal being complete orthogonality. Currently, there is coupling in both directions: In order to produce valid plans, one needs to apply the `EnforceDistribution` rule before applying `EnforceSorting`, and the `EnforceDistribution` breaks sorting requirements and produces invalid plans on its own.
   
   With this PR, coupling is broken in one direction (`EnforceSorting` does not require `EnforceDistribution` to work correctly anymore), but not the other way yet (`EnforceDistribution` may still invalidate sort correctness, so one still needs to call `EnforceSorting` after that). Note that it is on our TODO list to fix the other direction too.
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   This PR adds a couple of functionalities to the `EnforceSorting` rule. 
   
   - Adds parallelize option to the`EnforceSorting` rule. If this flag is enabled. Physical plans of the form
       
       ```rust
       "SortExec: [t1_id@0 ASC NULLS LAST]",
       "  CoalescePartitionsExec",
       "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
       ```
       
       will turn into the plan below:
       
       ```rust
       "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
       "  SortExec: [t1_id@0 ASC NULLS LAST]",
       "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
       ```
       
       This increases speed in multi-threaded environments. You can see time comparisons below:
       
       | n_row | distinct | batch_size | Repartition Mode(8) | Rule On (mean) | Rule On(Median) | Rule Off (mean) | Rule Off(Median) |
       | --- | --- | --- | --- | --- | --- | --- | --- |
       | 1000 | 100 | 100 | RoundRobin | 3.015032ms | 2.988125ms | 5.213412ms | 4.515166ms |
       | 1000 | 100 | 100 | Hash | 6.00907ms | 5.895166ms | 10.636287ms | 10.3025ms |
       | 10_000 | 100 | 100 | RoundRobin | 24.429487ms | 24.042291ms | 84.282599ms | 84.117083ms |
       | 10_000 | 100 | 100 | Hash | 48.779278ms | 48.970708ms | 517.195637ms | 520.449625ms |
       | 20_000 | 100 | 100 | RoundRobin | 47.297341ms | 47.141041ms | 289.02502ms | 290.58625ms |
       | 20_000 | 100 | 100 | Hash | 114.230287ms | 107.842291ms | 1.992879062s | 2.002725333s |
       | 50_000 | 100 | 100 | RoundRobin | 117.185937ms | 115.300583ms | 1.604551912s | 1.609479958s |
       | 50_000 | 100 | 100 | Hash | 425.755595ms | 430.346625ms | 12.683566324s | 12.502681458s |
   - Issue [#5100](https://github.com/apache/arrow-datafusion/issues/5100) is closed, fixing the limit bug.
   - The `EnforceSorting` rule now also considers `SortPreservingMergeExec`s when performing optimizations. This enables us to optimize more queries.
   - We handle cases where a `UnionExec` partially maintains the ordering of some of its children, such as the case below:
       
       ```rust
       "  UnionExec",
       "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
       "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
       "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
       ```
       
       In this case, output ordering of `UnionExec` is `nullable_col@0 ASC`. It maintains the ordering of its second child and partially maintains the ordering of its first child. The `EnforceSorting` rule now considers such partially-order-maintaining paths during optimization. 
       
   - In order to handle multi-children operators like `UnionExec`s properly, the assumption that ordering only comes from a single path in the physical plan tree is removed. Now ordering can come from multiple paths.
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   We have added multiple tests to cover above mentioned functionalities. Approximately 550 lines of the changes come from test or test utils.
   
   # Are there any user-facing changes?
   
   No.
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099992951


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+

Review Comment:
   Moved these to init method.  Thanks for the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1419220457

   Thank you for this @mustafasrepo  -- looking really nice so far


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099932928


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingCoalescePartitions {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
+    // n-ary plans like Union -- that affect the output partitioning of the
+    // child. If the child has no connection to any `CoalescePartitionsExec`,
+    // simpliy store None (and not a subtree).
+    coalesce_onwards: Vec<Option<ExecTree>>,
+}
+
+impl PlanWithCorrespondingCoalescePartitions {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards: vec![None; length],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingCoalescePartitions::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_requirements = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_plans = children_requirements
                 .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
-                            }
+                .map(|item| item.plan.clone())
+                .collect();
+            let coalesce_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    // Leaves of the `coalesce_onwards` tree are `CoalescePartitionsExec`
+                    // operators. This tree collects all the intermediate executors that
+                    // maintain a single partition. If we just saw a `CoalescePartitionsExec`
+                    // operator, we reset the tree and start accumulating.
+                    let plan = item.plan;
+                    if plan.as_any().is::<CoalescePartitionsExec>() {
+                        Some(ExecTree {
+                            idx,
+                            plan,
+                            children: vec![],
+                        })
+                    } else if plan.children().is_empty() {
+                        // Plan has no children, there is nothing to propagate.
+                        None
+                    } else {
+                        let children = item
+                            .coalesce_onwards
+                            .into_iter()
+                            .flatten()
+                            .filter(|item| {
+                                // Only consider operators that don't require a
+                                // single partition.
+                                !matches!(
+                                    plan.required_input_distribution()[item.idx],
+                                    Distribution::SinglePartition
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        if children.is_empty() {
+                            None
+                        } else {
+                            Some(ExecTree {
+                                idx,
+                                plan,
+                                children,
+                            })
                         }
                     }
-                    vec![]
                 })
-                .collect::<Vec<_>>();
+                .collect();
             let plan = with_new_children_if_necessary(self.plan, children_plans)?;
-            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+            Ok(PlanWithCorrespondingCoalescePartitions {
+                plan,
+                coalesce_onwards,
+            })
         }
     }
 }
 

Review Comment:
   Same as above, suggest to move the construction of `coalesce_onwards` to a `init` method of `PlanWithCorrespondingCoalescePartitions` and keep the `map_children()` simple



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421115470

   
   > 
   > By the way, after thinking about this change. I found out that `GlobalSortSelection` approach doesn't always accomplish parallelization. Consider query below
   > 
   > ```sql
   > SELECT c1, \
   >     SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
   >     SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
   >     FROM aggregate_test_100 ORDER BY c1 ASC
   > ```
   > 
   > Its physical plan, is as follows in the current version
   > 
   > ```sql
   > "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   > "  SortExec: [c1@0 ASC NULLS LAST]",
   > "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
   > "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
   > "        SortExec: [c9@1 ASC NULLS LAST]",
   > "          CoalescePartitionsExec",
   > "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
   > "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   > "                CoalesceBatchesExec: target_batch_size=8192",
   > "                  RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
   > "                    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
   > ```
   > 
   > However, previously rule would produce the plan below
   > 
   > ```sql
   > "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   > "  SortExec: [c1@0 ASC NULLS LAST]",
   > "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
   > "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
   > "        SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
   > "          SortExec: [c9@1 ASC NULLS LAST]",
   > "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
   > "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   > "                CoalesceBatchesExec: target_batch_size=8192",
   > "                  RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
   > "                    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
   > ```
   > 
   > tomorrow, I will go back to first approach. And will add this example as a test.
   
   I think this is because that global sort  + `CoalescePartitionsExec` were added later by the two enforcement rules.
   An easy way to get ride from this is to run the `GlobalSortSelection` rule again after the two enforcement rules. I would prefer still let the `GlobalSortSelection` rule handle this optimization. Need to be enhance `GlobalSortSelection` rule to handle the  SortExec + CoalescePartitionsExec combination.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1419753813

   > After this merges with the new functionality, he plans to propose a top-down refactoring that simplifies the code without losing the functionality -- which I am super enthusiastic to see.
   
   I agree this sounds great
   
   For anyone else following along, I think the discussion @ozankabak  is referring to is on https://github.com/synnada-ai/arrow-datafusion/pull/43


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421646020

   > Obviously, this is only a deduction from the experience so far. A refactor PR that simplifies the code using a non-joint approach while still passing all the unit tests can easily prove me wrong 🤣
   
   Yes I agree that having an existing implementation and test suite is ideal -- if / when we find better ways to express the algorithm that still does the same behavior, we can always update the code to be more beautiful


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099381357


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_with_global_limit() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(1);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 1)";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]",
+            "  AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "    AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "      GlobalLimitExec: skip=0, fetch=1",
+            "        SortExec: [c13@0 ASC NULLS LAST]",
+            "          ProjectionExec: expr=[c13@0 as c13]",

Review Comment:
   I agree, let's do this in a follow-on PR!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1100041045


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering information propagates with
-                    //       necessary lineage information, compare `sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }

Review Comment:
   You are right, However, we want to decrease rule dependency as much as possible. With this check, `EnforceSorting` rule doesn't require `EnforceDistribution` rule to be called before it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1097831935


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -33,48 +33,80 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::add_sort_above_child;
 use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{reverse_sort_options, DataFusionError};
 use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
-use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use itertools::izip;
+use itertools::{concat, izip};
 use std::iter::zip;
 use std::sync::Arc;
 
 /// This rule inspects SortExec's in the given physical plan and removes the
-/// ones it can prove unnecessary.
+/// ones it can prove unnecessary. The boolean flag `parallelize_sorts`
+/// indicates whether we elect to transform CoalescePartitionsExec + SortExec
+/// cascades into SortExec + SortPreservingMergeExec cascades, which enables
+/// us to perform sorting in parallel.
 #[derive(Default)]
-pub struct EnforceSorting {}
+pub struct EnforceSorting {
+    parallelize_sorts: bool,
+}
 
 impl EnforceSorting {
     #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
+    pub fn new(parallelize_sorts: bool) -> Self {

Review Comment:
   We will modify the tests to verify both cases, thank you for pointing it out. I also agree with making this flag a configuration parameter, will do that too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421006216

   > > > > @alamb @ozankabak @mustafasrepo
   > > > > Regarding the global sort replaced to a parallel version(SortPreservingMergeExec + Local Sort) optimization, I think there is already a rule `GlobalSortSelection` for the exact purpose. I think we should not let the Sort Enforcement rule to handle this again. Implement/enhance such optimization in the `GlobalSortSelection` rule is more straightforward and do not need to care the positions of the `CoalescePartitionsExec`.
   > > > 
   > > > 
   > > > I am not sure how we can do all the local sort + merge substitutions just with `GlobalSortSelection`, which doesn't track coalesce operations on partitions as you rightly point out. Note that we handle (and parallel-optimize) not just top level sorts, but sorts at any depth within the plan, even with intermediate executors in between the coalesce operation and the sort in question.
   > > > We will take a deeper look today and see if we can move over the logic to `GlobalSortSelection` while still preserving the same functionality. If we can, great -- if not, we will share an example that blocks this. Thank you for the suggestion 👍
   > > 
   > > 
   > > Yes, please take a look at the `GlobalSortSelection` rule. This rule does not need to care about the position of `CoalescePartitionsExec` because `CoalescePartitionsExec`s are added by `EnforceDistribution` rule which is triggered after the `GlobalSortSelection` rule. The physical Sort Selection should happen in a very early stage of the physical optimization phase. I guess why the current `GlobalSortSelection` does not optimize all the `Global Sort` is because it is not that aggressive and has an additional check. If you comment that check, all the `Global Sort` should be replaced.
   > > `&& sort_exec.fetch().is_some()`
   > 
   > @mingmwang your suggestion works. This greatly simplifies the code. Thanks for the suggestion. By the way I didn't remove `&& sort_exec.fetch().is_some()` check directly. I `OR`ed this check with the config option (can be found [here](https://github.com/synnada-ai/arrow-datafusion/blob/6db42485b016ef0acb183cb70e391bfc6910d4f9/datafusion/core/src/physical_optimizer/global_sort_selection.rs#L59)). In case, one wants to toggle this feature. As you, and @alamb say in some contexts this may not be what users want.
   
   By the way, after thinking about this change. I found out that new approach doesn't always accomplish parallelization. Consider query below
   ```sql
   SELECT c1, \
       SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
       SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
       FROM aggregate_test_100 ORDER BY c1 ASC
   ```
   Its physical plan, is as follows in the current version
   ```sql
   "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   "  SortExec: [c1@0 ASC NULLS LAST]",
   "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
   "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
   "        SortExec: [c9@1 ASC NULLS LAST]",
   "          CoalescePartitionsExec",
   "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
   "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   "                CoalesceBatchesExec: target_batch_size=8192",
   "                  RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
   "                    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
   ```
   However, previously rule would produce the plan below
   ```sql
   "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   "  SortExec: [c1@0 ASC NULLS LAST]",
   "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
   "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
   "        SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
   "          SortExec: [c9@1 ASC NULLS LAST]",
   "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
   "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   "                CoalesceBatchesExec: target_batch_size=8192",
   "                  RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
   "                    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
   ```
   tomorrow, I will retract the changes. And will add this example as a test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420625925

   @mustafasrepo 
   
   I found there was bug in the `EnforceSorting` rule, it was not introduced by this PR, the rule might change the final output ordering of the plan and will affect the correctness of the query result.
   
   Please take a look at the below test case and SQL
   
   `test_window_agg_sort_reversed_plan`
   
   SQL
   ```
   SELECT
       c9,
       SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
       SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
       FROM aggregate_test_100
       LIMIT 5
   ```
   
   The optimized logical plan:
   
   ```
   Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
     Limit: skip=0, fetch=5
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
         WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
           TableScan: aggregate_test_100 projection=[c9]
   ```
   
   The optimized physical plan:
   ```
   ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2],
     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1,
       GlobalLimitExec: skip=0, fetch=5,
         BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }],
           BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }],
             SortExec: [c9@0 DESC], global=true
   ```
   
   Based on the logical plan, the final plan's output ordering should be `ORDER BY C9 ASC` and the `LIMIT 5` should gives the least 5 items. But after the physical optimization, the final plan's output ordering becomes  `ORDER BY C9 DESC`  and the `LIMIT 5` gives the top 5 items.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098584451


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "          SortExec: [non_nullable_col@1 ASC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+        let limit = local_limit_exec(sort);
+        let limit = global_limit_exec(limit);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, limit]);
+        let repartition = repartition_exec(union);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, repartition);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      GlobalLimitExec: skip=0, fetch=100",
+            "        LocalLimitExec: fetch=100",
+            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+
+        // We should keep the bottom `SortExec`.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "      UnionExec",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        GlobalLimitExec: skip=0, fetch=100",
+            "          LocalLimitExec: fetch=100",
+            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "              ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]

Review Comment:
   Based on the current bottom-up approach, I think it is quite hard to separate them or there will be duplicate code as well. At the very beginning, they were separated actually, local sort adding was done in the original `BasicEnforcement` rule and sort removing/optimization was done in this rule.  
   
   I had almost finished a top-down version of `EnforceSorting` rule, some UT results are failed and not consistent with the current UT results, not sure which one is the expected, need to check further. I believe top-down is more safe and powerful, it can handle the case like below and only a Sort('a', 'b', 'c') will be added and do not need to remove redundant/unnecessary Sort added during the process .
   
   ```
   Top plan required('a', 'b', 'c')
        Middle plan required('a')
           Bottom plan required('a', 'b') 
   ```
   With the top-down approach, it is also safe to handle the Window expr's reverse ordering optimization and it prefers to keep the most top sort requirements and will not break the final plan's output ordering. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1419639880

   Thanks for reviewing! We will address your reviews and finalize this tomorrow. FYI, @mingmwang was kind enough to go through this PR in our fork, and has given us some examples to test the changes on.
   
   After this merges with the new functionality, he plans to propose a top-down refactoring that simplifies the code without losing the functionality -- which I am super enthusiastic to see.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099378094


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",

Review Comment:
   Yes, this is an unrelated behavior which only makes sense when cost of hashing is significant. It is on our roadmap to make `EnforceDistribution` smarter, maybe we can touch on this within that scope and make a single multi-threaded hash repartitioner that does achieves the same purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421965394

   Thanks @mustafasrepo for this impressive improvement. I'm also very interested in this part and will review this PR carefully today. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1097831067


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1980,8 +1980,8 @@ async fn left_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",

Review Comment:
   It replaces coalesce + a single sort with multiple (parallel) sorts + merge, which is a better choice in multi-threaded environments with an efficient merge operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098266601


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,34 +134,170 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
-                .collect::<Vec<_>>();
+                .map(|item| item.plan.clone())
+                .collect();
             let sort_onwards = children_requirements
-                .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` tree are sort-introducing operators
+                    // (e.g `SortExec`, `SortPreservingMergeExec`). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let is_union = plan.as_any().is::<UnionExec>();
+                    // If the executor is a `UnionExec`, and it has an output ordering;
+                    // then it at least partially maintains some child's output ordering.
+                    // Therefore, we propagate this information upwards.
+                    let partially_maintains =
+                        is_union && plan.output_ordering().is_some();

Review Comment:
   I have changed code according to your suggestion. It now uses properties of the plan. Thanks for the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421593187

   > Regarding the rule applying ordering, since DataFusion optimization framework is still a traditional heuristic style framework, the rule applying orders always matter, we can not assume one rule can work independently without the others.
   
   > Specifically, the `EnforceDistribution` rule is responsible for handling the global distribution requirements.
   And EnforceSorting rule is responsible for handling the local sort requirements. It's also responsible for removing
   unnecessary global sort and local sort. The global distribution requirements need to be handled first, after that we can handle the local sort(inner-partition) requirements.
   
   
   Thank you @mingmwang -- I think part of what is confusing here is that two different things are happening as "optimization" passes.
   
   1. "Fixing up the plan for correctness" (aka "EnforceSorting"), which I think is a very similar at a high level to what the [TypeCoercion](https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/type_coercion.rs) logical optimizer rule does (coerces types in expressions so they are compatible even if that was not the case in the input plan)
   2. "Keep the same semantics of the plan, but rewrite it for better performance" (aka GlobalSortSelection / OptimizeSorts)
   
   I think @liukun4515  helped the logical optimizer greatly by identifying this difference, and pulling all the type coercion to the beginning of the optimizer passes ([source link](https://github.com/apache/arrow-datafusion/blob/e222bd627b6e7974133364fed4600d74b4da6811/datafusion/optimizer/src/optimizer.rs#L207)). We probably could have gone farther and made it clear that the TypeCoercion pass is not an optimizer but rather required for correctness. 
   
   Maybe such clarity in this case could help too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420458485

   > I agree that fixing partitioning (global) and then sorting (local) is the more intuitive order, but this does not seem strictly necessary to me in theory. I can imagine changing global properties while still preserving the previous local properties for every partition (in the new plan). I think such a behavior would make rules very robust and easy to reason with. The current PR is not really about this anyway, but that's my general line of thinking when we refer to orthogonality.
   > 
   > Nevertheless, maybe you are aware of a fundamental issue (that I am not foreseeing right now) which makes this impossible. Or, maybe doing this has some other negative consequences. If that turns out to be the case, then we will go with the current status quo, of course.
   
   If we implement the `RepartitionExec`/`CoalescePartitionsExec` with the sort preserving merge, we can make the two rules more orthogonally. But compared with the current random merge,  sort preserving merge is not always a performance gain, especially in a distributed environment(like Ballista). We can discuss this in other tickets/issues.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo closed pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo closed pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations
URL: https://github.com/apache/arrow-datafusion/pull/5171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099970375


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering information propagates with
-                    //       necessary lineage information, compare `sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }

Review Comment:
   Could you please explain the purpose of these lines of code ? The original code just remove the `SortExec`.  Looks like the new change try to handle a case that the current `SortExec`  is a global Sort and the  sort_input is actually `local Sort`, instead of removing the `global Sort`, replace it with a  `SortPreservingMergeExec`.  But I think we should not see such physical plan tree. This is because for a global Sort,  after `EnforceDistribution` rule, a `CoalescePartitionsExec` will be added as the input of that global Sort, and `CoalescePartitionsExec`  can not propagate any sort properties. the `ordering_satisfy` check will become false. So I think we do not need specific handling for global Sort here. Please correct me if I am wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1423310854

   Thanks all -- this was a great and epic collaboration. I love it. Onwards and upwards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421451352

   ```sql
    SELECT
       c9,
       SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
       SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
       FROM aggregate_test_100
       LIMIT 5
   ```
   
   I think the output of this query is "implementation defined" according to the SQL spec -- it is correct to produce any arbitrary 5 rows out because there is no `ORDER BY` on the outer query. If the user wants a specific set of rows they ned to add an explicit `ORDER BY`.  
   
   ```sql
    SELECT
       c9,
       SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
       SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
       FROM aggregate_test_100
       ORDER BY c9 ASC
       LIMIT 5
   ```
   
   This is a pretty bad UX choice on the part of SQL in my opinion, and causes all sorts of confusion (e.g. when test output changes from run to run!) but that is what the SQL spec says. 
   
   > I don't think this is a bug. 
   
   I agree. I think it is acceptable for the optimizer to rewrite the plan so the output happens to be different


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420632377

   
   > > Yes, please take a look at the `GlobalSortSelection` rule. This rule does not need to care about the position of `CoalescePartitionsExec` because `CoalescePartitionsExec`s are added by `EnforceDistribution` rule which is triggered after the `GlobalSortSelection` rule. The physical Sort Selection should happen in a very early stage of the physical optimization phase. I guess why the current `GlobalSortSelection` does not optimize all the `Global Sort` is because it is not that aggressive and has an additional check. If you comment that check, all the `Global Sort` should be replaced.
   > > `&& sort_exec.fetch().is_some()`
   > 
   > @mingmwang your suggestion works. This greatly simplifies the code. Thanks for the suggestion. By the way I didn't remove `&& sort_exec.fetch().is_some()` check directly. I `OR`ed this check with the config option (can be found [here](https://github.com/synnada-ai/arrow-datafusion/blob/6db42485b016ef0acb183cb70e391bfc6910d4f9/datafusion/core/src/physical_optimizer/global_sort_selection.rs#L59)). In case, one wants to toggle this feature. As you, and @alamb say in some contexts this may not be what users want.
   
   @mustafasrepo 
   My suggestion is just removing this fetch check and use the config option you added in this PR to turn on/off the optimization.  
   @alamb How do you think ?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420617520

   > > > @alamb @ozankabak @mustafasrepo
   > > > Regarding the global sort replaced to a parallel version(SortPreservingMergeExec + Local Sort) optimization, I think there is already a rule `GlobalSortSelection` for the exact purpose. I think we should not let the Sort Enforcement rule to handle this again. Implement/enhance such optimization in the `GlobalSortSelection` rule is more straightforward and do not need to care the positions of the `CoalescePartitionsExec`.
   > > 
   > > 
   > > I am not sure how we can do all the local sort + merge substitutions just with `GlobalSortSelection`, which doesn't track coalesce operations on partitions as you rightly point out. Note that we handle (and parallel-optimize) not just top level sorts, but sorts at any depth within the plan, even with intermediate executors in between the coalesce operation and the sort in question.
   > > We will take a deeper look today and see if we can move over the logic to `GlobalSortSelection` while still preserving the same functionality. If we can, great -- if not, we will share an example that blocks this. Thank you for the suggestion 👍
   > 
   > Yes, please take a look at the `GlobalSortSelection` rule. This rule does not need to care about the position of `CoalescePartitionsExec` because `CoalescePartitionsExec`s are added by `EnforceDistribution` rule which is triggered after the `GlobalSortSelection` rule. The physical Sort Selection should happen in a very early stage of the physical optimization phase. I guess why the current `GlobalSortSelection` does not optimize all the `Global Sort` is because it is not that aggressive and has an additional check. If you comment that check, all the `Global Sort` should be replaced.
   > 
   > `&& sort_exec.fetch().is_some()`
   
   @mingmwang your suggestion works. This greatly simplifies the code. Thanks for the suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099992736


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingCoalescePartitions {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
+    // n-ary plans like Union -- that affect the output partitioning of the
+    // child. If the child has no connection to any `CoalescePartitionsExec`,
+    // simpliy store None (and not a subtree).
+    coalesce_onwards: Vec<Option<ExecTree>>,
+}
+
+impl PlanWithCorrespondingCoalescePartitions {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards: vec![None; length],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingCoalescePartitions::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_requirements = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_plans = children_requirements
                 .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
-                            }
+                .map(|item| item.plan.clone())
+                .collect();
+            let coalesce_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    // Leaves of the `coalesce_onwards` tree are `CoalescePartitionsExec`
+                    // operators. This tree collects all the intermediate executors that
+                    // maintain a single partition. If we just saw a `CoalescePartitionsExec`
+                    // operator, we reset the tree and start accumulating.
+                    let plan = item.plan;
+                    if plan.as_any().is::<CoalescePartitionsExec>() {
+                        Some(ExecTree {
+                            idx,
+                            plan,
+                            children: vec![],
+                        })
+                    } else if plan.children().is_empty() {
+                        // Plan has no children, there is nothing to propagate.
+                        None
+                    } else {
+                        let children = item
+                            .coalesce_onwards
+                            .into_iter()
+                            .flatten()
+                            .filter(|item| {
+                                // Only consider operators that don't require a
+                                // single partition.
+                                !matches!(
+                                    plan.required_input_distribution()[item.idx],
+                                    Distribution::SinglePartition
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        if children.is_empty() {
+                            None
+                        } else {
+                            Some(ExecTree {
+                                idx,
+                                plan,
+                                children,
+                            })
                         }
                     }
-                    vec![]
                 })
-                .collect::<Vec<_>>();
+                .collect();
             let plan = with_new_children_if_necessary(self.plan, children_plans)?;
-            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+            Ok(PlanWithCorrespondingCoalescePartitions {
+                plan,
+                coalesce_onwards,
+            })
         }
     }
 }
 

Review Comment:
   Moved these to init method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421617531

   I think it makes sense to explain the evolution of this so that everybody has a richer context. We actually started with two rules, one enforcement rule and one optimization rule (remember `optimize_sorts`?). As we worked on that design over time, the optimization rule grew to carry out the following tasks:
   - Optimization rule sometimes removes sorts (only).
   - Optimization rule sometimes moves a sort somewhere else.
   - Optimization rule sometimes ends up removing multiple sorts but doing so requires adding fewer sorts elsewhere.
   
   Here, you see examples of a general pattern where there could be `N` additions and `M` removals. Actually, simple enforcement corresponds to the case `M` = 0, and the optimization rule can end up with any of the remaining cases. Since doing the latter requires almost 95% of the code of a general any `N`/any `M` solution, we ended up merging the rules. This greatly reduced code duplication and IMO made the rule easier to use.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099988290


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering information propagates with
-                    //       necessary lineage information, compare `sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }
+                } else {
+                    // Remove the sort:
+                    PlanWithCorrespondingSort {
+                        plan: sort_input,
+                        sort_onwards: sort_onwards.to_vec(),
+                    }
+                },
+            );
         }
     }
-    Ok(None)
+    None
 }
 
 /// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
 /// it may allow removing a sort.
 fn analyze_window_sort_removal(
-    window_expr: &[Arc<dyn WindowExpr>],
-    partition_keys: &[Arc<dyn PhysicalExpr>],
-    sort_exec: &SortExec,
-    sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    sort_tree: &mut ExecTree,
+    window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
-    let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
-        DataFusionError::Plan("A SortExec should have output ordering".to_string())
-    })?;
-    let physical_ordering = sort_exec.input().output_ordering();
-    let physical_ordering = if let Some(physical_ordering) = physical_ordering {
-        physical_ordering
+    let (window_expr, partition_keys) = if let Some(exec) =
+        window_exec.as_any().downcast_ref::<BoundedWindowAggExec>()
+    {
+        (exec.window_expr(), &exec.partition_keys)
+    } else if let Some(exec) = window_exec.as_any().downcast_ref::<WindowAggExec>() {
+        (exec.window_expr(), &exec.partition_keys)
     } else {
-        // If there is no physical ordering, there is no way to remove a sort -- immediately return:
-        return Ok(None);
+        return Err(DataFusionError::Plan(
+            "Expects to receive either WindowAggExec of BoundedWindowAggExec".to_string(),
+        ));
     };
-    let (can_skip_sorting, should_reverse) = can_skip_sort(
-        window_expr[0].partition_by(),
-        required_ordering,
-        &sort_exec.input().schema(),
-        physical_ordering,
-    )?;
-    if can_skip_sorting {
-        let new_window_expr = if should_reverse {
-            window_expr
-                .iter()
-                .map(|e| e.get_reverse_expr())
-                .collect::<Option<Vec<_>>>()
-        } else {
-            Some(window_expr.to_vec())
-        };
-        if let Some(window_expr) = new_window_expr {
-            let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
-            let new_schema = new_child.schema();
-
-            let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
-            // If all window exprs can run with bounded memory choose bounded window variant
-            let new_plan = if uses_bounded_memory {
-                Arc::new(BoundedWindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
+
+    let mut first_should_reverse = None;
+    let mut physical_ordering_common = vec![];
+    for sort_any in sort_tree.get_leaves() {
+        let sort_output_ordering = sort_any.output_ordering();
+        // Variable `sort_any` will either be a `SortExec` or a
+        // `SortPreservingMergeExec`, and both have a single child.
+        // Therefore, we can use the 0th index without loss of generality.
+        let sort_input = sort_any.children()[0].clone();
+        let physical_ordering = sort_input.output_ordering();
+        // TODO: Once we can ensure that required ordering information propagates with
+        //       the necessary lineage information, compare `physical_ordering` and the
+        //       ordering required by the window executor instead of `sort_output_ordering`.
+        //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
+        //       Currently, we can not remove such sorts.
+        let required_ordering = sort_output_ordering.ok_or_else(|| {
+            DataFusionError::Plan("A SortExec should have output ordering".to_string())
+        })?;
+        if let Some(physical_ordering) = physical_ordering {
+            if physical_ordering_common.is_empty()
+                || physical_ordering.len() < physical_ordering_common.len()
+            {
+                physical_ordering_common = physical_ordering.to_vec();
+            }
+            let (can_skip_sorting, should_reverse) = can_skip_sort(
+                window_expr[0].partition_by(),
+                required_ordering,
+                &sort_input.schema(),
+                physical_ordering,
+            )?;
+            if !can_skip_sorting {
+                return Ok(None);
+            }
+            if let Some(first_should_reverse) = first_should_reverse {
+                if first_should_reverse != should_reverse {
+                    return Ok(None);
+                }
             } else {
-                Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
-            };
-            return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+                first_should_reverse = Some(should_reverse);
+            }
+        } else {
+            // If there is no physical ordering, there is no way to remove a
+            // sort, so immediately return.
+            return Ok(None);
         }
     }
+    let new_window_expr = if first_should_reverse.unwrap() {
+        window_expr
+            .iter()
+            .map(|e| e.get_reverse_expr())
+            .collect::<Option<Vec<_>>>()
+    } else {
+        Some(window_expr.to_vec())
+    };
+    if let Some(window_expr) = new_window_expr {
+        let requires_single_partition = matches!(
+            window_exec.required_input_distribution()[sort_tree.idx],
+            Distribution::SinglePartition
+        );
+        let new_child = remove_corresponding_sort_from_sub_plan(
+            sort_tree,
+            requires_single_partition,
+        )?;
+        let new_schema = new_child.schema();
+
+        let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
+        // If all window expressions can run with bounded memory, choose the
+        // bounded window variant:
+        let new_plan = if uses_bounded_memory {
+            Arc::new(BoundedWindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        } else {
+            Arc::new(WindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        };
+        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+    }
     Ok(None)
 }
 
-/// Updates child to remove the unnecessary sorting below it.
-fn update_child_to_remove_unnecessary_sort(
+/// Updates child to remove the unnecessary `CoalescePartitions` below it.
+fn update_child_to_change_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    coalesce_onwards: &mut Option<ExecTree>,
+    sort_exec: Option<&SortExec>,
 ) -> Result<()> {
-    if !sort_onwards.is_empty() {
-        *child = remove_corresponding_sort_from_sub_plan(sort_onwards)?;
+    if let Some(coalesce_onwards) = coalesce_onwards {
+        *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?;
     }
     Ok(())
 }
 
-/// Converts an [ExecutionPlan] trait object to a [SortExec] when possible.
-fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec> {
-    sort_any.as_any().downcast_ref::<SortExec>().ok_or_else(|| {
-        DataFusionError::Plan("Given ExecutionPlan is not a SortExec".to_string())
-    })
+/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
+fn change_corresponding_coalesce_in_sub_plan(
+    coalesce_onwards: &mut ExecTree,
+    sort_exec: Option<&SortExec>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    Ok(
+        if coalesce_onwards
+            .plan
+            .as_any()
+            .is::<CoalescePartitionsExec>()
+        {
+            // We can safely use the 0th index since we have a `CoalescePartitionsExec`.
+            let coalesce_input = coalesce_onwards.plan.children()[0].clone();
+            if let Some(sort_exec) = sort_exec {
+                let sort_expr = sort_exec.expr();
+                if !ordering_satisfy(
+                    coalesce_input.output_ordering(),
+                    Some(sort_expr),
+                    || coalesce_input.equivalence_properties(),
+                ) {
+                    return add_sort_above_child(&coalesce_input, sort_expr.to_vec());
+                }
+            }
+            coalesce_input

Review Comment:
   I believe most of the time the global `SortExec` + `CoalescePartitionsExec` are adjacent nodes.  But is it possible that there will be some Projections between the `SortExec` and `CoalescePartitionsExec`  which will make the SortExec has totally different exprs/columns 
   with the `CoalescePartitionsExec's` input plan ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1097703424


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1980,8 +1980,8 @@ async fn left_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",

Review Comment:
   is the idea here that SortExec already coalsces  within itself so  CoalscePartitionsExec isn't needed?



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,87 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(2);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",

Review Comment:
   🤔  I spent some time thinking this through and I think repartitioning a sorted stream to just remerge it is ok. This plan in particular seems like it would be better without the repartition / merge because the Projection isn't doing anything. Perhaps that will be fixed by https://github.com/apache/arrow-datafusion/pull/5074 from @xiaoyong-z



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,34 +134,170 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
-                .collect::<Vec<_>>();
+                .map(|item| item.plan.clone())
+                .collect();
             let sort_onwards = children_requirements
-                .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` tree are sort-introducing operators
+                    // (e.g `SortExec`, `SortPreservingMergeExec`). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let is_union = plan.as_any().is::<UnionExec>();
+                    // If the executor is a `UnionExec`, and it has an output ordering;
+                    // then it at least partially maintains some child's output ordering.
+                    // Therefore, we propagate this information upwards.
+                    let partially_maintains =
+                        is_union && plan.output_ordering().is_some();

Review Comment:
   I wonder if there is any way to make these decisions on properties of the plan (i.e. has multiple inputs and an output ordering) rather than the type of operator (Union)?



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "          SortExec: [non_nullable_col@1 ASC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+        let limit = local_limit_exec(sort);
+        let limit = global_limit_exec(limit);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, limit]);
+        let repartition = repartition_exec(union);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, repartition);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      GlobalLimitExec: skip=0, fetch=100",
+            "        LocalLimitExec: fetch=100",
+            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+
+        // We should keep the bottom `SortExec`.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "      UnionExec",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        GlobalLimitExec: skip=0, fetch=100",
+            "          LocalLimitExec: fetch=100",
+            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "              ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]

Review Comment:
   I find this behavior strange -- the fact that sort enforcement and optimization are done in a single pass. Maybe eventually we could move the "fix sort order to be correct" as one pass and then "optimize away unecessary sorts" as a follow on pass
   



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "          SortExec: [non_nullable_col@1 ASC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+        let limit = local_limit_exec(sort);
+        let limit = global_limit_exec(limit);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, limit]);
+        let repartition = repartition_exec(union);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, repartition);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      GlobalLimitExec: skip=0, fetch=100",
+            "        LocalLimitExec: fetch=100",
+            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+
+        // We should keep the bottom `SortExec`.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "      UnionExec",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        GlobalLimitExec: skip=0, fetch=100",
+            "          LocalLimitExec: fetch=100",
+            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "              ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_change_wrong_sorting() -> Result<()> {

Review Comment:
   Can I please request you don't change the inputs to the existing tests (aka keep `test_change_wrong_sorting`, `test_union_inputs_sorted`, ) for this initial PR review? I would like to know how the code changes have changed the existing tests.
   
   
   Once we have merged this PR, if some of the tests are redundant perhaps we can remove them as a follow on PM



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",

Review Comment:
   👍  that is pretty fancy



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -897,6 +1436,332 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted3() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+        let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+        let sort2 = sort_exec(sort_exprs2, source1);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone());
+
+        let union = union_exec(vec![sort1, source2, sort2]);
+        let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, union);
+
+        // First input to the union is not Sorted (SortExec is finer than required ordering by the SortPreservingMergeExec above).
+        // Second input to the union is already Sorted (matches with the required ordering by the SortPreservingMergeExec above).
+        // Third input to the union is not Sorted (SortExec is matches required ordering by the SortPreservingMergeExec above).
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        // should adjust sorting in the first input of the union such that it is not unnecessarily fine
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",

Review Comment:
   that is a pretty clever plan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420248052

   > @mustafasrepo @ozankabak Regarding the rule applying ordering, since DataFusion optimization framework is still a traditional heuristic style framework, the rule applying orders always matter, we can not assume one rule can work independently without the others.
   > 
   > Specifically , `EnforceDistribution` rule is responsible for handling the global distribution requirements. And `EnforceSorting` rule is responsible for handling the local sort requirements. It's also responsible for removing unnecessary global sort and local sort. The global distribution requirements need to be handled first, after that we can handle the local sort(inner-partition) requirements.
   > 
   > Global properties vs Local properties http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf
   
   I agree that fixing partitioning (global) and then sorting (local) is the more intuitive order, but this does not seem strictly necessary to me in theory. I can imagine changing global properties while still preserving the previous local properties for every partition (in the new plan). I think such a behavior would make rules very robust and easy to reason with. The current PR is not really about this anyway, but that's my general line of thinking when we refer to orthogonality.
   
   Nevertheless, maybe you are aware of a fundamental issue (that I am not foreseeing right now) which makes this impossible. If that is the case, then we will go with the current status quo, of course.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1422166014

   > I don't think this is a bug.
   I also agree. However, we may need to refine the UT to handle this non-deterministic ordering.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1423317594

   Benchmark runs are scheduled for baseline = e16650073320dec3b0517aba12c6ce738eb4a503 and contender = dee9fd7d2b9a3dbe57fb88fb9cbe9572f6117ab2. dee9fd7d2b9a3dbe57fb88fb9cbe9572f6117ab2 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/cf3742e6981a4af5ae66351ad7a2e4d4...38d42e69e4f549f2be79d3e6f20d8521/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/fa6bd3729c964e19a186ff61d9f5c551...d14d27258bcb4eaeb842d0142f8cd9fa/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/dbce847abd2e4bd5831fa695486049f8...b6c769a95a0b45eab8b3c56ad9938086/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/4e70146050f9458eb8d46831ae41b9c7...e3167220c38347489051978a1bfda7c8/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099944661


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingCoalescePartitions {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
+    // n-ary plans like Union -- that affect the output partitioning of the
+    // child. If the child has no connection to any `CoalescePartitionsExec`,
+    // simpliy store None (and not a subtree).
+    coalesce_onwards: Vec<Option<ExecTree>>,
+}
+
+impl PlanWithCorrespondingCoalescePartitions {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards: vec![None; length],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingCoalescePartitions::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_requirements = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_plans = children_requirements
                 .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
-                            }
+                .map(|item| item.plan.clone())
+                .collect();
+            let coalesce_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    // Leaves of the `coalesce_onwards` tree are `CoalescePartitionsExec`
+                    // operators. This tree collects all the intermediate executors that
+                    // maintain a single partition. If we just saw a `CoalescePartitionsExec`
+                    // operator, we reset the tree and start accumulating.
+                    let plan = item.plan;
+                    if plan.as_any().is::<CoalescePartitionsExec>() {
+                        Some(ExecTree {
+                            idx,
+                            plan,
+                            children: vec![],
+                        })
+                    } else if plan.children().is_empty() {
+                        // Plan has no children, there is nothing to propagate.
+                        None
+                    } else {
+                        let children = item
+                            .coalesce_onwards
+                            .into_iter()
+                            .flatten()
+                            .filter(|item| {
+                                // Only consider operators that don't require a
+                                // single partition.
+                                !matches!(
+                                    plan.required_input_distribution()[item.idx],
+                                    Distribution::SinglePartition
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        if children.is_empty() {
+                            None
+                        } else {
+                            Some(ExecTree {
+                                idx,
+                                plan,
+                                children,
+                            })
                         }
                     }
-                    vec![]
                 })
-                .collect::<Vec<_>>();
+                .collect();
             let plan = with_new_children_if_necessary(self.plan, children_plans)?;
-            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+            Ok(PlanWithCorrespondingCoalescePartitions {
+                plan,
+                coalesce_onwards,
+            })
         }
     }
 }
 
+/// The boolean flag `repartition_sorts` defined in the config indicates
+/// whether we elect to transform CoalescePartitionsExec + SortExec cascades
+/// into SortExec + SortPreservingMergeExec cascades, which enables us to
+/// perform sorting in parallel.
 impl PhysicalOptimizerRule for EnforceSorting {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Execute a post-order traversal to adjust input key ordering:
         let plan_requirements = PlanWithCorrespondingSort::new(plan);

Review Comment:
   Please remove this wrong comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099922769


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+

Review Comment:
   There is heavy logic in the `map_children()` method. Can we move the heavy logic to other places like the move to the `transform()`. Usually `map_children()` just apply the `transform` logic to all the `children` and construct new children.  We can also have a new `init()` method for `PlanWithCorrespondingSort` and handle the construction of `sort_onwards` from the current plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099922769


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+

Review Comment:
   There is heavy logic in the `map_children()` method. Can we move the heavy logic to other places like the move to the `transform()`. Usually `map_children()` just apply the `transform` logic to all the `children` and construct new children.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1423044279

   Thanks for all the detailed reviews everybody, we really appreciate it! Rest assured we will be maintaining this and fix any bugs should we discover them in the future.
   
   We will also take care of the follow-ons identified during the review.
   
   @alamb, from our perspective, this is ready to go. Feel free to merge when you feel appropriate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1422436446

   Overall the PR LGTM and I think it is good to merge.  I will need more time to take a more closer look but I think it is good to go since there are enough UTs to verify all the changes.  Some review comments can be addressed in following PRs for the global sort related enhancement.
   
   @mustafasrepo @ozankabak Thanks a lot!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1419559955

   BTW I checked this code against IOx's tests -- https://github.com/influxdata/influxdb_iox/pull/6870 -- it looks good to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1422162788

   > > I think this is because that global sort + CoalescePartitionsExec were added later by the two enforcement rules.
   > > An easy way to get ride from this is to run the GlobalSortSelection rule again after the two enforcement rules. I would prefer still let the GlobalSortSelection rule handle this optimization. Need to be enhance GlobalSortSelection rule to handle the SortExec + CoalescePartitionsExec combination.
   > 
   > If we end up handling this combination there, and running it twice; it really diminishes the value of that approach. Maybe there is a way to do it elegantly, I will think about it in detail. If we (or you) can figure out a way to do this elegantly, we can go back to this approach; but for now, it doesn't look too good to me.
   > 
   > > Another approach I can think is maybe we can have a specific handling in EnforceDistribution rule, if the plan 's distribution requirement is Distribution::SinglePartition and the plan also has some sorting requirements, add the prefer-parallel-sort configuration is on, add SortPreservingMergeExec + SortExec. If the SortExec is unnecessary, it will be removed later by the EnforceSorting rule
   > 
   > I think this is interesting and sounds more promising to me. I will think about this today, maybe we can do this in a follow-on PR.
   
   I also prefer the second approach to handle the choice between "global sort + CoalescePartitionsExec" and "SortPreservingMergeExec + local SortExec" in **EnforceDistribution**.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099245227


##########
datafusion/common/src/config.rs:
##########
@@ -290,6 +290,17 @@ config_namespace! {
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
 
+        /// Should DataFusion parallelize Sort during physical plan creation.

Review Comment:
   👍 



##########
datafusion/common/src/config.rs:
##########
@@ -290,6 +290,17 @@ config_namespace! {
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
 
+        /// Should DataFusion parallelize Sort during physical plan creation.

Review Comment:
   Calling this option `repartition_sorts` is probably more consistent with `repartition_windows`, `repartition_joins`, etc -- however, I think we can rename it in a follow on PR as well



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",

Review Comment:
   this double repartition still looks strange to me, but I understand it was not introduced by this PR



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",

Review Comment:
   this is a pretty clever plan (to repartition first, then do the sort in parallel, and then merge the results at the end) 👍 



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_with_global_limit() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(1);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 1)";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]",
+            "  AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "    AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "      GlobalLimitExec: skip=0, fetch=1",
+            "        SortExec: [c13@0 ASC NULLS LAST]",
+            "          ProjectionExec: expr=[c13@0 as c13]",

Review Comment:
   We don't need to fix it in this PR, but this plan could be a lot better (it should be using a limit I think in the sort)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1422174129

   > > I don't think this is a bug.
   > 
   > I also agree. However, we may need to refine the UT to handle this non-deterministic ordering.
   
   If you guys all agree this is not a bug, I accept this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098339730


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,87 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(2);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",

Review Comment:
   I have updated above test such that when the change in #5074 merged, test case will still be valid(There will still be`RepartitionExec` in the plan so that we would be able to verify at the end there is `SortPreservingMergeExec`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098275745


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "          SortExec: [non_nullable_col@1 ASC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+        let limit = local_limit_exec(sort);
+        let limit = global_limit_exec(limit);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, limit]);
+        let repartition = repartition_exec(union);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, repartition);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      GlobalLimitExec: skip=0, fetch=100",
+            "        LocalLimitExec: fetch=100",
+            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+
+        // We should keep the bottom `SortExec`.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "      UnionExec",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        GlobalLimitExec: skip=0, fetch=100",
+            "          LocalLimitExec: fetch=100",
+            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "              ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]

Review Comment:
   We can separate enforcement functionality in a pass then optimize away unnecessary Sorts. However, optimization code would be exactly same with this one (Since during removal we may invalidate ordering requirements for above layers and we need to add necessary Sort). In that case, first enforcement pass would be redundant. Hence, separating them doesn't help. However, I will think through it in detail. Maybe we can find a way to accomplish this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098333949


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -33,48 +33,80 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::add_sort_above_child;
 use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{reverse_sort_options, DataFusionError};
 use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
-use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use itertools::izip;
+use itertools::{concat, izip};
 use std::iter::zip;
 use std::sync::Arc;
 
 /// This rule inspects SortExec's in the given physical plan and removes the
-/// ones it can prove unnecessary.
+/// ones it can prove unnecessary. The boolean flag `parallelize_sorts`
+/// indicates whether we elect to transform CoalescePartitionsExec + SortExec
+/// cascades into SortExec + SortPreservingMergeExec cascades, which enables
+/// us to perform sorting in parallel.
 #[derive(Default)]
-pub struct EnforceSorting {}
+pub struct EnforceSorting {
+    parallelize_sorts: bool,
+}
 
 impl EnforceSorting {
     #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
+    pub fn new(parallelize_sorts: bool) -> Self {

Review Comment:
   This flag is now a configuration parameter. I add a test to verify `false` case also. Test can be found [here](https://github.com/synnada-ai/arrow-datafusion/blob/0ddc0a756a032081bee2f8f29995c35b50d72b6a/datafusion/core/tests/sql/window.rs#L2426-L2462)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099988290


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering information propagates with
-                    //       necessary lineage information, compare `sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }
+                } else {
+                    // Remove the sort:
+                    PlanWithCorrespondingSort {
+                        plan: sort_input,
+                        sort_onwards: sort_onwards.to_vec(),
+                    }
+                },
+            );
         }
     }
-    Ok(None)
+    None
 }
 
 /// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
 /// it may allow removing a sort.
 fn analyze_window_sort_removal(
-    window_expr: &[Arc<dyn WindowExpr>],
-    partition_keys: &[Arc<dyn PhysicalExpr>],
-    sort_exec: &SortExec,
-    sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    sort_tree: &mut ExecTree,
+    window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
-    let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
-        DataFusionError::Plan("A SortExec should have output ordering".to_string())
-    })?;
-    let physical_ordering = sort_exec.input().output_ordering();
-    let physical_ordering = if let Some(physical_ordering) = physical_ordering {
-        physical_ordering
+    let (window_expr, partition_keys) = if let Some(exec) =
+        window_exec.as_any().downcast_ref::<BoundedWindowAggExec>()
+    {
+        (exec.window_expr(), &exec.partition_keys)
+    } else if let Some(exec) = window_exec.as_any().downcast_ref::<WindowAggExec>() {
+        (exec.window_expr(), &exec.partition_keys)
     } else {
-        // If there is no physical ordering, there is no way to remove a sort -- immediately return:
-        return Ok(None);
+        return Err(DataFusionError::Plan(
+            "Expects to receive either WindowAggExec of BoundedWindowAggExec".to_string(),
+        ));
     };
-    let (can_skip_sorting, should_reverse) = can_skip_sort(
-        window_expr[0].partition_by(),
-        required_ordering,
-        &sort_exec.input().schema(),
-        physical_ordering,
-    )?;
-    if can_skip_sorting {
-        let new_window_expr = if should_reverse {
-            window_expr
-                .iter()
-                .map(|e| e.get_reverse_expr())
-                .collect::<Option<Vec<_>>>()
-        } else {
-            Some(window_expr.to_vec())
-        };
-        if let Some(window_expr) = new_window_expr {
-            let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
-            let new_schema = new_child.schema();
-
-            let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
-            // If all window exprs can run with bounded memory choose bounded window variant
-            let new_plan = if uses_bounded_memory {
-                Arc::new(BoundedWindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
+
+    let mut first_should_reverse = None;
+    let mut physical_ordering_common = vec![];
+    for sort_any in sort_tree.get_leaves() {
+        let sort_output_ordering = sort_any.output_ordering();
+        // Variable `sort_any` will either be a `SortExec` or a
+        // `SortPreservingMergeExec`, and both have a single child.
+        // Therefore, we can use the 0th index without loss of generality.
+        let sort_input = sort_any.children()[0].clone();
+        let physical_ordering = sort_input.output_ordering();
+        // TODO: Once we can ensure that required ordering information propagates with
+        //       the necessary lineage information, compare `physical_ordering` and the
+        //       ordering required by the window executor instead of `sort_output_ordering`.
+        //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
+        //       Currently, we can not remove such sorts.
+        let required_ordering = sort_output_ordering.ok_or_else(|| {
+            DataFusionError::Plan("A SortExec should have output ordering".to_string())
+        })?;
+        if let Some(physical_ordering) = physical_ordering {
+            if physical_ordering_common.is_empty()
+                || physical_ordering.len() < physical_ordering_common.len()
+            {
+                physical_ordering_common = physical_ordering.to_vec();
+            }
+            let (can_skip_sorting, should_reverse) = can_skip_sort(
+                window_expr[0].partition_by(),
+                required_ordering,
+                &sort_input.schema(),
+                physical_ordering,
+            )?;
+            if !can_skip_sorting {
+                return Ok(None);
+            }
+            if let Some(first_should_reverse) = first_should_reverse {
+                if first_should_reverse != should_reverse {
+                    return Ok(None);
+                }
             } else {
-                Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
-            };
-            return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+                first_should_reverse = Some(should_reverse);
+            }
+        } else {
+            // If there is no physical ordering, there is no way to remove a
+            // sort, so immediately return.
+            return Ok(None);
         }
     }
+    let new_window_expr = if first_should_reverse.unwrap() {
+        window_expr
+            .iter()
+            .map(|e| e.get_reverse_expr())
+            .collect::<Option<Vec<_>>>()
+    } else {
+        Some(window_expr.to_vec())
+    };
+    if let Some(window_expr) = new_window_expr {
+        let requires_single_partition = matches!(
+            window_exec.required_input_distribution()[sort_tree.idx],
+            Distribution::SinglePartition
+        );
+        let new_child = remove_corresponding_sort_from_sub_plan(
+            sort_tree,
+            requires_single_partition,
+        )?;
+        let new_schema = new_child.schema();
+
+        let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
+        // If all window expressions can run with bounded memory, choose the
+        // bounded window variant:
+        let new_plan = if uses_bounded_memory {
+            Arc::new(BoundedWindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        } else {
+            Arc::new(WindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        };
+        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+    }
     Ok(None)
 }
 
-/// Updates child to remove the unnecessary sorting below it.
-fn update_child_to_remove_unnecessary_sort(
+/// Updates child to remove the unnecessary `CoalescePartitions` below it.
+fn update_child_to_change_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    coalesce_onwards: &mut Option<ExecTree>,
+    sort_exec: Option<&SortExec>,
 ) -> Result<()> {
-    if !sort_onwards.is_empty() {
-        *child = remove_corresponding_sort_from_sub_plan(sort_onwards)?;
+    if let Some(coalesce_onwards) = coalesce_onwards {
+        *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?;
     }
     Ok(())
 }
 
-/// Converts an [ExecutionPlan] trait object to a [SortExec] when possible.
-fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec> {
-    sort_any.as_any().downcast_ref::<SortExec>().ok_or_else(|| {
-        DataFusionError::Plan("Given ExecutionPlan is not a SortExec".to_string())
-    })
+/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
+fn change_corresponding_coalesce_in_sub_plan(
+    coalesce_onwards: &mut ExecTree,
+    sort_exec: Option<&SortExec>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    Ok(
+        if coalesce_onwards
+            .plan
+            .as_any()
+            .is::<CoalescePartitionsExec>()
+        {
+            // We can safely use the 0th index since we have a `CoalescePartitionsExec`.
+            let coalesce_input = coalesce_onwards.plan.children()[0].clone();
+            if let Some(sort_exec) = sort_exec {
+                let sort_expr = sort_exec.expr();
+                if !ordering_satisfy(
+                    coalesce_input.output_ordering(),
+                    Some(sort_expr),
+                    || coalesce_input.equivalence_properties(),
+                ) {
+                    return add_sort_above_child(&coalesce_input, sort_expr.to_vec());
+                }
+            }
+            coalesce_input

Review Comment:
   I believe most of the time the global `SortExec` + `CoalescePartitionsExec` are adjacent nodes.  But is it possible that there will be some Projections between the `SortExec` and `CoalescePartitionsExec`  which will make the SortExec has totally different exprs/columns 
   with the `CoalescePartitionsExec's` input plan ?  If that is the case, `ordering_satisfy` will
   return false definitely and add `SortExec` on top of the `coalesce_input` will generate invalid plan.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099889802


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -58,23 +60,47 @@ impl EnforceSorting {
     }
 }
 
-/// This is a "data class" we use within the [EnforceSorting] rule that
-/// tracks the closest `SortExec` descendant for every child of a plan.
+/// This object implements a tree that we use while keeping track of paths
+/// leading to `SortExec`s.
+#[derive(Debug, Clone)]
+struct ExecTree {
+    /// Child index of the plan in its parent
+    pub idx: usize,
+    /// Children of the plan that would need updating if we remove leaf executors
+    pub children: Vec<ExecTree>,
+    /// The `ExecutionPlan` associated with this node
+    pub plan: Arc<dyn ExecutionPlan>,
+}
+

Review Comment:
   Nice! This abstraction is much cleaner and easy to understand than before !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099911470


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })

Review Comment:
   It is quite dangerous to compare the child `output ordering` with the parent plan's output ordering.  Some times the column indexes are changed and can not satisfy even the plan actually maintains the output ordering.  I think we already have the `maintains_input_order()` method in the `ExecutionPlan`, can we use that ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099944842


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })

Review Comment:
   You are right. It now uses result of the `maintains_input_order()` to decide propagation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420174086

   @mustafasrepo @ozankabak 
   Regarding the rule applying ordering, since DataFusion optimization framework is still a traditional heuristic style framework, the rule applying orders always matter, we can not assume one rule can work independently without the others.  
   
   Specifically , `EnforceDistribution` rule is responsible for handling the global distribution requirements.
   And `EnforceSorting` rule is responsible for handling the local sort requirements. It's also responsible for removing
   unnecessary global sort and local sort.   The global distribution requirements need to be handled first, after that we can handle the local sort(inner-partition) requirements.
   
   Global properties vs Local properties
   http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf
    
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420226651

   > @alamb @ozankabak @mustafasrepo
   > 
   > Regarding the global sort replaced to a parallel version(SortPreservingMergeExec + Local Sort) optimization, I think there is already a rule `GlobalSortSelection` for the exact purpose. I think we should not let the Sort Enforcement rule to handle this again. Implement/enhance such optimization in the `GlobalSortSelection` rule is more straightforward and do not need to care the positions of the `CoalescePartitionsExec`.
   
   I am not sure how we can do all the local sort + merge substitutions just with `GlobalSortSelection`, which doesn't track partitions as you rightly point out. Note that we handle (and parallel-optimize) not just top level sorts, but sorts at any depth within the plan, even with intermediate executors in between.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421171267

   > @mingmwang, I don't follow why you think there is a a bug in [this example](https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420625925). The query does not specify a top level ordering, so limiting the top or bottom 5 are both fair game from a query perspective. Looking at it from a logical vs. physical plan perspective, I still do not see a problem. Order annotations in logical plan aggregations are local to them, they don't have a bearing on output ordering AFAIK. These annotations specify the frame, but that's where their scope ends. If they had any bearing on output ordering, they would have the same effect at the query level too, but they don't.
   
   The query does not specify a top level ordering explicitly, but the final result ordering is impacted by the ordering expressions in WindowExecs,  I think with/without the (reverse window expr) optimization, this SQL will give different results, this is something we should avoid.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099988290


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -184,225 +431,380 @@ fn ensure_sorting(
                 );
                 if !is_ordering_satisfied {
                     // Make sure we preserve the ordering requirements:
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                    update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
-                    sort_onwards.push((idx, child.clone()))
+                    *sort_onwards = Some(ExecTree {
+                        idx,
+                        plan: child.clone(),
+                        children: vec![],
+                    })
                 }
-                if let [first, ..] = sort_onwards.as_slice() {
-                    // The ordering requirement is met, we can analyze if there is an unnecessary sort:
-                    let sort_any = first.1.clone();
-                    let sort_exec = convert_to_sort_exec(&sort_any)?;
-                    let sort_output_ordering = sort_exec.output_ordering();
-                    let sort_input_ordering = sort_exec.input().output_ordering();
-                    // Simple analysis: Does the input of the sort in question already satisfy the ordering requirements?
-                    if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
-                        sort_exec.input().equivalence_properties()
-                    }) {
-                        update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
-                    }
+                if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when we can
                     // calculate the result in reverse:
-                    else if let Some(exec) =
-                        requirements.plan.as_any().downcast_ref::<WindowAggExec>()
+                    if plan.as_any().is::<WindowAggExec>()
+                        || plan.as_any().is::<BoundedWindowAggExec>()
                     {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
-                            return Ok(Some(result));
-                        }
-                    } else if let Some(exec) = requirements
-                        .plan
-                        .as_any()
-                        .downcast_ref::<BoundedWindowAggExec>()
-                    {
-                        if let Some(result) = analyze_window_sort_removal(
-                            exec.window_expr(),
-                            &exec.partition_keys,
-                            sort_exec,
-                            sort_onwards,
-                        )? {
+                        if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
                             return Ok(Some(result));
                         }
                     }
-                    // TODO: Once we can ensure that required ordering information propagates with
-                    //       necessary lineage information, compare `sort_input_ordering` and `required_ordering`.
-                    //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
-                    //       Currently, we can not remove such sorts.
                 }
             }
             (Some(required), None) => {
-                // Ordering requirement is not met, we should add a SortExec to the plan.
-                let sort_expr = required.to_vec();
-                *child = add_sort_above_child(child, sort_expr)?;
-                *sort_onwards = vec![(idx, child.clone())];
+                // Ordering requirement is not met, we should add a `SortExec` to the plan.
+                *child = add_sort_above_child(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree {
+                    idx,
+                    plan: child.clone(),
+                    children: vec![],
+                })
             }
             (None, Some(_)) => {
-                // We have a SortExec whose effect may be neutralized by a order-imposing
-                // operator. In this case, remove this sort:
-                if !requirements.plan.maintains_input_order()[idx] {
-                    update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
+                // We have a `SortExec` whose effect may be neutralized by
+                // another order-imposing operator. Remove or update this sort:
+                if !plan.maintains_input_order()[idx] {
+                    let count = plan.output_ordering().map_or(0, |e| e.len());
+                    if (count > 0) && !is_sort(&plan) {
+                        update_child_to_change_finer_sort(child, sort_onwards, count)?;
+                    } else {
+                        update_child_to_remove_unnecessary_sort(
+                            child,
+                            sort_onwards,
+                            &plan,
+                        )?;
+                    }
                 }
             }
             (None, None) => {}
         }
     }
-    if plan.children().is_empty() {
-        Ok(Some(requirements))
-    } else {
-        let new_plan = requirements.plan.with_new_children(new_children)?;
-        for (idx, (trace, required_ordering)) in new_onwards
-            .iter_mut()
-            .zip(new_plan.required_input_ordering())
-            .enumerate()
-            .take(new_plan.children().len())
-        {
-            if new_plan.maintains_input_order()[idx]
-                && required_ordering.is_none()
-                && !trace.is_empty()
-            {
-                trace.push((idx, new_plan.clone()));
-            } else {
-                trace.clear();
-                if is_sort(&new_plan) {
-                    trace.push((idx, new_plan.clone()));
-                }
-            }
-        }
-        Ok(Some(PlanWithCorrespondingSort {
-            plan: new_plan,
-            sort_onwards: new_onwards,
-        }))
-    }
+    Ok(Some(PlanWithCorrespondingSort {
+        plan: plan.with_new_children(children)?,
+        sort_onwards,
+    }))
 }
 
-/// Analyzes a given `SortExec` to determine whether its input already has
-/// a finer ordering than this `SortExec` enforces.
+/// Analyzes a given `SortExec` (`plan`) to determine whether its input already
+/// has a finer ordering than this `SortExec` enforces.
 fn analyze_immediate_sort_removal(
-    requirements: &PlanWithCorrespondingSort,
-) -> Result<Option<PlanWithCorrespondingSort>> {
-    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+    plan: &Arc<dyn ExecutionPlan>,
+    sort_onwards: &[Option<ExecTree>],
+) -> Option<PlanWithCorrespondingSort> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let sort_input = sort_exec.input().clone();
         // If this sort is unnecessary, we should remove it:
         if ordering_satisfy(
-            sort_exec.input().output_ordering(),
+            sort_input.output_ordering(),
             sort_exec.output_ordering(),
-            || sort_exec.input().equivalence_properties(),
+            || sort_input.equivalence_properties(),
         ) {
             // Since we know that a `SortExec` has exactly one child,
             // we can use the zero index safely:
-            let mut new_onwards = requirements.sort_onwards[0].to_vec();
-            if !new_onwards.is_empty() {
-                new_onwards.pop();
-            }
-            return Ok(Some(PlanWithCorrespondingSort {
-                plan: sort_exec.input().clone(),
-                sort_onwards: vec![new_onwards],
-            }));
+            return Some(
+                if !sort_exec.preserve_partitioning()
+                    && sort_input.output_partitioning().partition_count() > 1
+                {
+                    // Replace the sort with a sort-preserving merge:
+                    let new_plan: Arc<dyn ExecutionPlan> =
+                        Arc::new(SortPreservingMergeExec::new(
+                            sort_exec.expr().to_vec(),
+                            sort_input,
+                        ));
+                    let new_tree = ExecTree {
+                        idx: 0,
+                        plan: new_plan.clone(),
+                        children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+                    };
+                    PlanWithCorrespondingSort {
+                        plan: new_plan,
+                        sort_onwards: vec![Some(new_tree)],
+                    }
+                } else {
+                    // Remove the sort:
+                    PlanWithCorrespondingSort {
+                        plan: sort_input,
+                        sort_onwards: sort_onwards.to_vec(),
+                    }
+                },
+            );
         }
     }
-    Ok(None)
+    None
 }
 
 /// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
 /// it may allow removing a sort.
 fn analyze_window_sort_removal(
-    window_expr: &[Arc<dyn WindowExpr>],
-    partition_keys: &[Arc<dyn PhysicalExpr>],
-    sort_exec: &SortExec,
-    sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    sort_tree: &mut ExecTree,
+    window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
-    let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
-        DataFusionError::Plan("A SortExec should have output ordering".to_string())
-    })?;
-    let physical_ordering = sort_exec.input().output_ordering();
-    let physical_ordering = if let Some(physical_ordering) = physical_ordering {
-        physical_ordering
+    let (window_expr, partition_keys) = if let Some(exec) =
+        window_exec.as_any().downcast_ref::<BoundedWindowAggExec>()
+    {
+        (exec.window_expr(), &exec.partition_keys)
+    } else if let Some(exec) = window_exec.as_any().downcast_ref::<WindowAggExec>() {
+        (exec.window_expr(), &exec.partition_keys)
     } else {
-        // If there is no physical ordering, there is no way to remove a sort -- immediately return:
-        return Ok(None);
+        return Err(DataFusionError::Plan(
+            "Expects to receive either WindowAggExec of BoundedWindowAggExec".to_string(),
+        ));
     };
-    let (can_skip_sorting, should_reverse) = can_skip_sort(
-        window_expr[0].partition_by(),
-        required_ordering,
-        &sort_exec.input().schema(),
-        physical_ordering,
-    )?;
-    if can_skip_sorting {
-        let new_window_expr = if should_reverse {
-            window_expr
-                .iter()
-                .map(|e| e.get_reverse_expr())
-                .collect::<Option<Vec<_>>>()
-        } else {
-            Some(window_expr.to_vec())
-        };
-        if let Some(window_expr) = new_window_expr {
-            let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
-            let new_schema = new_child.schema();
-
-            let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
-            // If all window exprs can run with bounded memory choose bounded window variant
-            let new_plan = if uses_bounded_memory {
-                Arc::new(BoundedWindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
+
+    let mut first_should_reverse = None;
+    let mut physical_ordering_common = vec![];
+    for sort_any in sort_tree.get_leaves() {
+        let sort_output_ordering = sort_any.output_ordering();
+        // Variable `sort_any` will either be a `SortExec` or a
+        // `SortPreservingMergeExec`, and both have a single child.
+        // Therefore, we can use the 0th index without loss of generality.
+        let sort_input = sort_any.children()[0].clone();
+        let physical_ordering = sort_input.output_ordering();
+        // TODO: Once we can ensure that required ordering information propagates with
+        //       the necessary lineage information, compare `physical_ordering` and the
+        //       ordering required by the window executor instead of `sort_output_ordering`.
+        //       This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b).
+        //       Currently, we can not remove such sorts.
+        let required_ordering = sort_output_ordering.ok_or_else(|| {
+            DataFusionError::Plan("A SortExec should have output ordering".to_string())
+        })?;
+        if let Some(physical_ordering) = physical_ordering {
+            if physical_ordering_common.is_empty()
+                || physical_ordering.len() < physical_ordering_common.len()
+            {
+                physical_ordering_common = physical_ordering.to_vec();
+            }
+            let (can_skip_sorting, should_reverse) = can_skip_sort(
+                window_expr[0].partition_by(),
+                required_ordering,
+                &sort_input.schema(),
+                physical_ordering,
+            )?;
+            if !can_skip_sorting {
+                return Ok(None);
+            }
+            if let Some(first_should_reverse) = first_should_reverse {
+                if first_should_reverse != should_reverse {
+                    return Ok(None);
+                }
             } else {
-                Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    new_child,
-                    new_schema,
-                    partition_keys.to_vec(),
-                    Some(physical_ordering.to_vec()),
-                )?) as _
-            };
-            return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+                first_should_reverse = Some(should_reverse);
+            }
+        } else {
+            // If there is no physical ordering, there is no way to remove a
+            // sort, so immediately return.
+            return Ok(None);
         }
     }
+    let new_window_expr = if first_should_reverse.unwrap() {
+        window_expr
+            .iter()
+            .map(|e| e.get_reverse_expr())
+            .collect::<Option<Vec<_>>>()
+    } else {
+        Some(window_expr.to_vec())
+    };
+    if let Some(window_expr) = new_window_expr {
+        let requires_single_partition = matches!(
+            window_exec.required_input_distribution()[sort_tree.idx],
+            Distribution::SinglePartition
+        );
+        let new_child = remove_corresponding_sort_from_sub_plan(
+            sort_tree,
+            requires_single_partition,
+        )?;
+        let new_schema = new_child.schema();
+
+        let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
+        // If all window expressions can run with bounded memory, choose the
+        // bounded window variant:
+        let new_plan = if uses_bounded_memory {
+            Arc::new(BoundedWindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        } else {
+            Arc::new(WindowAggExec::try_new(
+                window_expr,
+                new_child,
+                new_schema,
+                partition_keys.to_vec(),
+                Some(physical_ordering_common),
+            )?) as _
+        };
+        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+    }
     Ok(None)
 }
 
-/// Updates child to remove the unnecessary sorting below it.
-fn update_child_to_remove_unnecessary_sort(
+/// Updates child to remove the unnecessary `CoalescePartitions` below it.
+fn update_child_to_change_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
-    sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
+    coalesce_onwards: &mut Option<ExecTree>,
+    sort_exec: Option<&SortExec>,
 ) -> Result<()> {
-    if !sort_onwards.is_empty() {
-        *child = remove_corresponding_sort_from_sub_plan(sort_onwards)?;
+    if let Some(coalesce_onwards) = coalesce_onwards {
+        *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?;
     }
     Ok(())
 }
 
-/// Converts an [ExecutionPlan] trait object to a [SortExec] when possible.
-fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec> {
-    sort_any.as_any().downcast_ref::<SortExec>().ok_or_else(|| {
-        DataFusionError::Plan("Given ExecutionPlan is not a SortExec".to_string())
-    })
+/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
+fn change_corresponding_coalesce_in_sub_plan(
+    coalesce_onwards: &mut ExecTree,
+    sort_exec: Option<&SortExec>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    Ok(
+        if coalesce_onwards
+            .plan
+            .as_any()
+            .is::<CoalescePartitionsExec>()
+        {
+            // We can safely use the 0th index since we have a `CoalescePartitionsExec`.
+            let coalesce_input = coalesce_onwards.plan.children()[0].clone();
+            if let Some(sort_exec) = sort_exec {
+                let sort_expr = sort_exec.expr();
+                if !ordering_satisfy(
+                    coalesce_input.output_ordering(),
+                    Some(sort_expr),
+                    || coalesce_input.equivalence_properties(),
+                ) {
+                    return add_sort_above_child(&coalesce_input, sort_expr.to_vec());
+                }
+            }
+            coalesce_input

Review Comment:
   I believe most of the time the global `SortExec` + `CoalescePartitionsExec` are adjacent nodes.  But is it possible that there will be some Projections between the `SortExec` and `CoalescePartitionsExec`  which will make the SortExec has totally different exprs/columns with the `CoalescePartitionsExec's` input plan ?  If that is the case, `ordering_satisfy` will return false definitely and add `SortExec` on top of the `coalesce_input` will generate a invalid plan.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1415719070

   I hope to review this PR carefully today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099378094


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",

Review Comment:
   Yes, this is an unrelated behavior which only makes sense when cost of hashing is significant. It is on our roadmap to make `EnforceDistribution` smarter, maybe we can touch on this within that scope and make a single multi-threaded hash repartitioner that achieves the same purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099383349


##########
datafusion/common/src/config.rs:
##########
@@ -290,6 +290,17 @@ config_namespace! {
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
 
+        /// Should DataFusion parallelize Sort during physical plan creation.

Review Comment:
   I agree, just sent a commit with this renaming. There seems to be an intermittent CI failure, so if you can kick it off again when you get the chance that'd be great. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099957264


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,44 +128,189 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
+                .map(|item| item.plan.clone())
                 .collect::<Vec<_>>();
             let sort_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` are `SortExec`(Introduces ordering). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let output_ordering = plan.output_ordering();
+                    let required_orderings = plan.required_input_ordering();
+                    let children =
+                        izip!(&plan.children(), item.sort_onwards, required_orderings)
+                            .filter_map(|(child, element, required_ordering)| {
+                                // Executor maintains or partially maintains its child's output ordering
+                                let maintains = ordering_satisfy(
+                                    child.output_ordering(),
+                                    output_ordering,
+                                    || child.equivalence_properties(),
+                                );
+                                if (required_ordering.is_none() && maintains) || is_spm {
+                                    element
+                                } else {
+                                    None
+                                }
+                            })
+                            .collect::<Vec<ExecTree>>();
+                    if !children.is_empty() {
+                        // Add parent node to the tree if there is at least one
+                        // child with a subtree:
+                        Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children,
+                        })
+                    } else {
+                        // There is no sort linkage for this child, do nothing.
+                        None
+                    }
+                })
+                .collect();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+
+/// This object is used within the [EnforceSorting] rule to track the closest
+/// `CoalescePartitionsExec` descendant(s) for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingCoalescePartitions {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a subtree of `ExecutionPlan`s starting from the
+    // child until the `CoalescePartitionsExec`(s) -- could be multiple for
+    // n-ary plans like Union -- that affect the output partitioning of the
+    // child. If the child has no connection to any `CoalescePartitionsExec`,
+    // simpliy store None (and not a subtree).
+    coalesce_onwards: Vec<Option<ExecTree>>,
+}
+
+impl PlanWithCorrespondingCoalescePartitions {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingCoalescePartitions {
+            plan,
+            coalesce_onwards: vec![None; length],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingCoalescePartitions::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_requirements = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_plans = children_requirements
                 .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
-                            }
+                .map(|item| item.plan.clone())
+                .collect();
+            let coalesce_onwards = children_requirements
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    // Leaves of the `coalesce_onwards` tree are `CoalescePartitionsExec`
+                    // operators. This tree collects all the intermediate executors that
+                    // maintain a single partition. If we just saw a `CoalescePartitionsExec`
+                    // operator, we reset the tree and start accumulating.
+                    let plan = item.plan;
+                    if plan.as_any().is::<CoalescePartitionsExec>() {
+                        Some(ExecTree {
+                            idx,
+                            plan,
+                            children: vec![],
+                        })
+                    } else if plan.children().is_empty() {
+                        // Plan has no children, there is nothing to propagate.
+                        None
+                    } else {
+                        let children = item
+                            .coalesce_onwards
+                            .into_iter()
+                            .flatten()
+                            .filter(|item| {
+                                // Only consider operators that don't require a
+                                // single partition.
+                                !matches!(
+                                    plan.required_input_distribution()[item.idx],
+                                    Distribution::SinglePartition
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        if children.is_empty() {
+                            None
+                        } else {
+                            Some(ExecTree {
+                                idx,
+                                plan,
+                                children,
+                            })
                         }
                     }
-                    vec![]
                 })
-                .collect::<Vec<_>>();
+                .collect();
             let plan = with_new_children_if_necessary(self.plan, children_plans)?;
-            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+            Ok(PlanWithCorrespondingCoalescePartitions {
+                plan,
+                coalesce_onwards,
+            })
         }
     }
 }
 
+/// The boolean flag `repartition_sorts` defined in the config indicates
+/// whether we elect to transform CoalescePartitionsExec + SortExec cascades
+/// into SortExec + SortPreservingMergeExec cascades, which enables us to
+/// perform sorting in parallel.
 impl PhysicalOptimizerRule for EnforceSorting {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Execute a post-order traversal to adjust input key ordering:
         let plan_requirements = PlanWithCorrespondingSort::new(plan);

Review Comment:
   Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420986304

   @mingmwang, I don't follow why you think there is a a bug in [this example](https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420625925). The query does not specify a top level ordering, so limiting the top or bottom 5 are both fair game from a query perspective. Looking at it from a logical vs. physical plan perspective, I still do not see a problem. Order annotations in logical plan aggregations are local to them, they don't have a bearing on output ordering AFAIK. These annotations specify the frame, but that's where their scope ends. If they had any bearing on output ordering, they would have the same effect at the query level too, but they don't.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421228337

   I don't think this is a bug. Let's think about the converse scenario: The non-optimized query could have produced the other order (which would be valid), or the user could have changed the order of columns, and in that case we would have the illusion of preserving "the order" during optimization. In general, whenever there are multiple possibilities for what constitutes a valid query, there will always be some configurations where non-optimized plans and optimized plans differ (or agree) in under-constrained aspects.
   
   At the end of day, the optimizer's prime job is to end up with more efficient plans that obey the specification, not to conform to arbitrary behaviors of the non-optimized plan. In this case, there is simply no order in the specification, so I don't see a bug. The result is indeed correct.
   
   This being said, I think I understand the general suggestion you are making: In my words, I would put it this way: When there are multiple _equivalent_ optimizations, it is a good idea to choose the one that resembles the non-optimized query the most. I agree with this, and making progress towards this desiderate in follow-ons, refactors etc. would be very nice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1419132292

   I am now *actually* reviewing this PR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1097915435


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1980,8 +1980,8 @@ async fn left_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",

Review Comment:
   Makes sene



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1980,8 +1980,8 @@ async fn left_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
-                "  CoalescePartitionsExec",
+                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: [t1_id@0 ASC NULLS LAST]",

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1417419862

   I plan to review this PR carefully tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420155549

   @alamb @ozankabak @mustafasrepo 
   
   Regarding the global sort replaced to a parallel version(SortPreservingMergeExec + Local Sort) optimization, I think there is already a rule `GlobalSortSelection` for the exact purpose. I think we should not let  the Sort Enforcement rule to handle this again.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1420398291

   > > @alamb @ozankabak @mustafasrepo
   > > Regarding the global sort replaced to a parallel version(SortPreservingMergeExec + Local Sort) optimization, I think there is already a rule `GlobalSortSelection` for the exact purpose. I think we should not let the Sort Enforcement rule to handle this again. Implement/enhance such optimization in the `GlobalSortSelection` rule is more straightforward and do not need to care the positions of the `CoalescePartitionsExec`.
   > 
   > I am not sure how we can do all the local sort + merge substitutions just with `GlobalSortSelection`, which doesn't track coalesce operations on partitions as you rightly point out. Note that we handle (and parallel-optimize) not just top level sorts, but sorts at any depth within the plan, even with intermediate executors in between the coalesce operation and the sort in question.
   > 
   > We will take a deeper look today and see if we can move over the logic to `GlobalSortSelection` while still preserving the same functionality. If we can, great -- if not, we will share an example that blocks this. Thank you for the suggestion 👍
   
   Yes, please take a look at the `GlobalSortSelection` rule. This rule does not need to care about the  position of `CoalescePartitionsExec` because `CoalescePartitionsExec`s are added by `EnforceDistribution` rule which is triggered after the `GlobalSortSelection` rule.  The physical Sort Selection should happen in a very early stage of the physical optimization phase. I guess why the current `GlobalSortSelection` does not optimize all the  `Global Sort` is because it is not that aggressive and has an additional check. If you comment that check, all the `Global Sort` should be replaced.
   
   ` && sort_exec.fetch().is_some() `
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421168600

   > I think this is because that global sort + CoalescePartitionsExec were added later by the two enforcement rules.
   An easy way to get ride from this is to run the GlobalSortSelection rule again after the two enforcement rules. I would prefer still let the GlobalSortSelection rule handle this optimization. Need to be enhance GlobalSortSelection rule to handle the SortExec + CoalescePartitionsExec combination.
   
   If we end up handling this combination there, and running it twice; it really diminishes the value of this approach. Maybe there is a way to do it elegantly, I will think about it in detail. If we (or you) can figure out a way to do this elegantly, we can go back to this approach; but for now, it doesn't look too good to me.
   
   > Another approach I can think is maybe we can have a specific handling in EnforceDistribution rule, if the plan 's distribution requirement is Distribution::SinglePartition and the plan also has some sorting requirements, add the prefer-parallel-sort configuration is on, add SortPreservingMergeExec + SortExec. If the SortExec is unnecessary, it will be removed later by the EnforceSorting rule
   
   I think this is interesting and sounds more promising to me. I will think about this today, maybe we can do this in a follow-on PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1097428376


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -33,48 +33,80 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::add_sort_above_child;
 use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{reverse_sort_options, DataFusionError};
 use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
-use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use itertools::izip;
+use itertools::{concat, izip};
 use std::iter::zip;
 use std::sync::Arc;
 
 /// This rule inspects SortExec's in the given physical plan and removes the
-/// ones it can prove unnecessary.
+/// ones it can prove unnecessary. The boolean flag `parallelize_sorts`
+/// indicates whether we elect to transform CoalescePartitionsExec + SortExec
+/// cascades into SortExec + SortPreservingMergeExec cascades, which enables
+/// us to perform sorting in parallel.
 #[derive(Default)]
-pub struct EnforceSorting {}
+pub struct EnforceSorting {
+    parallelize_sorts: bool,
+}
 
 impl EnforceSorting {
     #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
+    pub fn new(parallelize_sorts: bool) -> Self {

Review Comment:
   I didn't see any place in the code that ever crated an `EnforceSorting` with `parallelize_sorts = false` (aka I searched for `EnforceSorting::new(false)`. I may have missed it. 
   
   I think we should either remove the code that handles parallelize_sorts = false as it is not used/tested or better yet (as explained in the overall PR comments) that we add a new config option to control the behavior and add some basic tests. 



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -33,48 +33,80 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::add_sort_above_child;
 use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{reverse_sort_options, DataFusionError};
 use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
-use datafusion_physical_expr::window::WindowExpr;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use itertools::izip;
+use itertools::{concat, izip};
 use std::iter::zip;
 use std::sync::Arc;
 
 /// This rule inspects SortExec's in the given physical plan and removes the
-/// ones it can prove unnecessary.
+/// ones it can prove unnecessary. The boolean flag `parallelize_sorts`
+/// indicates whether we elect to transform CoalescePartitionsExec + SortExec
+/// cascades into SortExec + SortPreservingMergeExec cascades, which enables
+/// us to perform sorting in parallel.
 #[derive(Default)]
-pub struct EnforceSorting {}
+pub struct EnforceSorting {
+    parallelize_sorts: bool,
+}
 
 impl EnforceSorting {
     #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
+    pub fn new(parallelize_sorts: bool) -> Self {
+        Self { parallelize_sorts }
     }
 }
 
-/// This is a "data class" we use within the [EnforceSorting] rule that
-/// tracks the closest `SortExec` descendant for every child of a plan.
+/// This object implements a tree that we use while keeping track of paths

Review Comment:
   ❤️  for the improved comments 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org