You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/02/07 07:29:44 UTC

[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098266601


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -102,34 +134,170 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
                 .collect::<Result<Vec<_>>>()?;
             let children_plans = children_requirements
                 .iter()
-                .map(|elem| elem.plan.clone())
-                .collect::<Vec<_>>();
+                .map(|item| item.plan.clone())
+                .collect();
             let sort_onwards = children_requirements
-                .iter()
-                .map(|item| {
-                    let onwards = &item.sort_onwards;
-                    if !onwards.is_empty() {
-                        let flags = item.plan.maintains_input_order();
-                        // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
-                        // if the executors in between maintain input ordering. If we are at
-                        // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
-                        // However, we want to propagate them above anyway.
-                        for (maintains, element) in flags.into_iter().zip(onwards.iter())
-                        {
-                            if (maintains || is_sort(&item.plan)) && !element.is_empty() {
-                                return element.clone();
+                .into_iter()
+                .enumerate()
+                .map(|(idx, item)| {
+                    let plan = &item.plan;
+                    // Leaves of the `sort_onwards` tree are sort-introducing operators
+                    // (e.g `SortExec`, `SortPreservingMergeExec`). This tree collects
+                    // all the intermediate executors that maintain this ordering. If
+                    // we just saw a sort-introducing operator, we reset the tree and
+                    // start accumulating.
+                    if is_sort(plan) {
+                        return Some(ExecTree {
+                            idx,
+                            plan: item.plan,
+                            children: vec![],
+                        });
+                    } else if is_limit(plan) {
+                        // There is no sort linkage for this path, it starts at a limit.
+                        return None;
+                    }
+                    let is_spm = is_sort_preserving_merge(plan);
+                    let is_union = plan.as_any().is::<UnionExec>();
+                    // If the executor is a `UnionExec`, and it has an output ordering;
+                    // then it at least partially maintains some child's output ordering.
+                    // Therefore, we propagate this information upwards.
+                    let partially_maintains =
+                        is_union && plan.output_ordering().is_some();

Review Comment:
   I have changed code according to your suggestion. It now uses properties of the plan. Thanks for the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org