You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/02/15 16:27:07 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #5290: TopDown EnforceSorting implementation

mingmwang opened a new pull request, #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #5289.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1114083510


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1432298878

   The PR is very large in scope. It changes parts of the old code (and certainly makes some changes to its tests), and also adds new code (and new tests). It would be much easier to review this if it were broken down to two PRs, where the first one only replicates the current functionality, has no functionality regressions, and does not change any tests at all; with the second PR adding new functionality. Right now, the new rule is significantly longer than the old rule (which is bad), but it offers more functionality (which is great). So is switching from bottom-up to top-down a good change or a bad change? We can't tell easily.
   
   Now, let me share my (very) preliminary impression so far after a cursory look: I see that it has better handling of sort preserving merges, smarter push-down of sorts under unions, and adds support for sort merge joins. These are good the bits. The cons are that it seems to regress on some cases where it doing better before. Most of these argue that it does this to preserve global output ordering but I am not sure I agree with that.
   
   Anyway, we will disentangle and review in detail, but I want to give you a heads up that this will take some time. We will need to analyze every case carefully, go back to the old version of the code (and tests), compare and contrast etc. Before we form an idea on the merits of bottom-up vs. top-down, our goal will be to create *two functionally equal* implementations passing *exactly the same* test suite. Without that, it is not possible to objectively decide.
   
   Whatever the result on bottom-up vs. top-down is, I think this exercise will end up making the rule better, so that's great. I will keep you posted as we make progress in the upcoming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1441260533

   > And In the TopDown rule implementation, I can add another configuration flag to enable removing Sorts more aggressively and achieve the same results as the the Bottom-up rule, but we still need to define clearly in which cases Sort can be removed.
   
   This would be great, thank you. We will perform one more pass of analysis with that and get clarity on what else is left (if any).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112611424


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();

Review Comment:
   Patterns in the form: `plan.as_any().downcast_ref::<ExecName>().is_some()` can be converted to `plan.as_any().is::<ExecName>()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1441246998

   
   Test name | TopDown Rule | Bottom Up Rule | Desc
   -- | -- | -- |--
   test_remove_unnecessary_sort4 | PASS | FAIL |  Two Down result is better, removed unnecessary `SortPreservingMergeExec`
   test_remove_unnecessary_sort6 | PASS | FAIL | 
   test_union_inputs_different_sorted2 | PASS | FAIL| Actually the results are the same, just the plan shape is different
   test_union_inputs_different_sorted4 | PASS | FAIL| Same as above
   test_union_inputs_different_sorted6 | PASS | FAIL| Two Down result is better
   test_sort_merge_join_order_by_left | PASS | FAIL
   test_sort_merge_join_order_by_right | PASS | FAIL
   test_not_remove_top_sort_window_multilayer | FAIL | PASS | Two Down rule does not remove the Sort due to preserve ordering(`maintains_input_order`)
   test_multiple_sort_window_exec | PASS | FAIL
   test_window_multi_path_sort | FAIL | PASS | Two Down rule does not remove the Sort due to preserve ordering(`maintains_input_order`)
   test_multilayer_coalesce_partitions | FAIL | PASS | In the Top down rule, global Sort optimization is achieved by the combination of `GlobalSortSelection`, `EnforceDistribution` and `EnforceSorting` now
   test_coalesce_propagate | FAIL | PASS | Same as above
   test_commutativity | FAIL | PASS | Same as above
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1114103949


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            // WindowAggExec(BoundedWindowAggExec) might reverse their sort requirements
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let reversed_request_child = reverse_window_sort_requirements(request_child);
+
+            if should_reverse_window_sort_requirements(
+                plan.clone(),
+                request_child,
+                reversed_request_child.as_deref(),
+            ) {
+                let WindowExecInfo {
+                    window_expr,
+                    input_schema,
+                    partition_keys,
+                } = extract_window_info_from_plan(plan).unwrap();
+
+                let new_window_expr = window_expr
+                    .iter()
+                    .map(|e| e.get_reverse_expr())
+                    .collect::<Option<Vec<_>>>();
+                let new_physical_ordering = create_sort_expr_from_requirement(
+                    reversed_request_child.clone().unwrap().as_ref(),
+                );
+                if let Some(window_expr) = new_window_expr {
+                    let uses_bounded_memory =
+                        window_expr.iter().all(|e| e.uses_bounded_memory());
+                    // If all window expressions can run with bounded memory, choose the
+                    // bounded window variant:
+                    let new_plan = if uses_bounded_memory {
+                        Arc::new(BoundedWindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    } else {
+                        Arc::new(WindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    };
+                    return Ok(Some(PlanWithSortRequirements {
+                        plan: new_plan,
+                        impact_result_ordering: false,
+                        satisfy_single_distribution: requirements
+                            .satisfy_single_distribution,
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![reversed_request_child],
+                    }));
+                }
+            }
+        }
+        Ok(Some(PlanWithSortRequirements {
+            required_ordering: None,
+            ..requirements
+        }))
+    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        // If the current plan is a SortExec, modify current SortExec to satisfy the parent requirements
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let mut new_plan = sort_exec.input.clone();
+        add_sort_above(&mut new_plan, parent_required_expr)?;
+        Ok(Some(
+            PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+        ))
+    } else {
+        // Can not satisfy the parent requirements, check whether the requirements can be pushed down. If not, add new SortExec.
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let maintains_input_order = plan.maintains_input_order();
+        // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
+        // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
+        // For UnionExec, we can always push down
+        if (maintains_input_order.is_empty()
+            || !maintains_input_order.iter().any(|o| *o)
+            || plan.as_any().downcast_ref::<RepartitionExec>().is_some()
+            || plan.as_any().downcast_ref::<FilterExec>().is_some()
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some())
+            && plan.as_any().downcast_ref::<UnionExec>().is_none()
+        {
+            let mut new_plan = plan.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+            Ok(Some(
+                PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+            ))
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let child_plan = plan.children()[0].clone();
+            match determine_children_requirement(
+                parent_required,
+                request_child,
+                child_plan,
+            ) {
+                RequirementsCompatibility::Satisfy => Ok(None),
+                RequirementsCompatibility::Compatible(adjusted) => {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![adjusted],
+                        ..requirements
+                    }))
+                }
+                RequirementsCompatibility::NonCompatible => {
+                    let WindowExecInfo {
+                        window_expr,
+                        input_schema,
+                        partition_keys,
+                    } = extract_window_info_from_plan(plan).unwrap();
+                    if should_reverse_window_exec(
+                        parent_required,
+                        request_child,
+                        &input_schema,
+                    ) {
+                        let new_physical_ordering = parent_required_expr.to_vec();
+                        let new_window_expr = window_expr
+                            .iter()
+                            .map(|e| e.get_reverse_expr())
+                            .collect::<Option<Vec<_>>>();
+                        if let Some(window_expr) = new_window_expr {
+                            let uses_bounded_memory =
+                                window_expr.iter().all(|e| e.uses_bounded_memory());
+                            let new_plan = if uses_bounded_memory {
+                                Arc::new(BoundedWindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            } else {
+                                Arc::new(WindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            };
+                            let adjusted_request_ordering =
+                                new_plan.required_input_ordering();
+                            return Ok(Some(PlanWithSortRequirements {
+                                plan: new_plan,
+                                impact_result_ordering: false,
+                                satisfy_single_distribution: requirements
+                                    .satisfy_single_distribution,
+                                required_ordering: None,
+                                adjusted_request_ordering,
+                            }));
+                        }
+                    }
+                    // Can not push down requirements, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
+            // If the current plan is SortMergeJoinExec
+            let left_columns_len = smj.left.schema().fields().len();
+            let expr_source_side =
+                expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
+            match expr_source_side {
+                Some(JoinSide::Left) if maintains_input_order[0] => {
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        parent_required,
+                        parent_required_expr,
+                        JoinSide::Left,
+                    )
+                }
+                Some(JoinSide::Right) if maintains_input_order[1] => {
+                    let new_right_required = match smj.join_type {
+                        JoinType::Inner | JoinType::Right => shift_right_required(
+                            parent_required.unwrap(),
+                            left_columns_len,
+                        )?,
+                        JoinType::RightSemi | JoinType::RightAnti => {
+                            parent_required.unwrap().to_vec()
+                        }
+                        _ => Err(DataFusionError::Plan(
+                            "Unexpected SortMergeJoin type here".to_string(),
+                        ))?,
+                    };
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        Some(new_right_required.deref()),
+                        parent_required_expr,
+                        JoinSide::Right,
+                    )
+                }
+                _ => {
+                    // Can not decide the expr side for SortMergeJoinExec, can not push down, add SortExec;
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if plan.required_input_ordering().iter().any(Option::is_some) {
+            let plan_children = plan.children();
+            let compatible_with_children = izip!(
+                maintains_input_order.iter(),
+                plan.required_input_ordering().into_iter(),
+                plan_children.iter()
+            )
+            .map(|(can_push_down, request_child, child)| {
+                if *can_push_down {
+                    determine_children_requirement(
+                        parent_required,
+                        request_child.as_deref(),
+                        child.clone(),
+                    )
+                } else {
+                    RequirementsCompatibility::NonCompatible
+                }
+            })
+            .collect::<Vec<_>>();
+            if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Satisfy))
+            {
+                // Requirements are satisfied, not need to push down.
+                Ok(None)
+            } else if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Compatible(_)))
+            {
+                // Adjust child requirements and push down the requirements
+                let adjusted = parent_required.map(|r| r.to_vec());
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![adjusted; plan_children.len()],
+                    ..requirements
+                }))
+            } else {
+                // Can not push down, add new SortExec
+                let mut new_plan = plan.clone();
+                add_sort_above(&mut new_plan, parent_required_expr)?;
+                Ok(Some(
+                    PlanWithSortRequirements::new_without_impact_result_ordering(
+                        new_plan,
+                    ),
+                ))
+            }
+        } else {
+            // The current plan does not have its own ordering requirements to its children, consider push down the requirements
+            if let Some(ProjectionExec { expr, .. }) =
+                plan.as_any().downcast_ref::<ProjectionExec>()
+            {
+                // For Projection, we need to transform the requirements to the columns before the Projection
+                // And then to push down the requirements
+                let new_adjusted =
+                    map_requirement_before_projection(parent_required, expr);
+                if new_adjusted.is_some() {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![new_adjusted],
+                        ..requirements
+                    }))
+                } else {
+                    // Can not push down, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            } else {
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![
+                        requirements.required_ordering;
+                        requirements
+                            .adjusted_request_ordering
+                            .len()
+                    ],
+                    ..requirements
+                }))
+            }
+        }
+    }
+}
+
+/// Analyzes a given `Sort` (`plan`) to determine whether the Sort can be removed:
+/// 1) The input already has a finer ordering than this `Sort` enforces.
+/// 2) The `Sort` does not impact the final result ordering.
+fn analyze_immediate_sort_removal(
+    requirements: &PlanWithSortRequirements,
+    sort_exec: &SortExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        sort_exec.input().output_ordering(),
+        sort_exec.output_ordering(),
+        || sort_exec.input().equivalence_properties(),
+    ) {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+            impact_result_ordering: requirements.impact_result_ordering,
+            satisfy_single_distribution: requirements.satisfy_single_distribution,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortExec
+    else if !requirements.impact_result_ordering {
+        if requirements.satisfy_single_distribution
+            && sort_exec.input().output_partitioning().partition_count() > 1
+        {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(CoalescePartitionsExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        } else {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        }
+    } else {
+        None
+    }
+}
+
+/// Analyzes a given `SortPreservingMergeExec` (`plan`) to determine whether the SortPreservingMergeExec can be removed:
+/// 1) The input already has a finer ordering than this `SortPreservingMergeExec` enforces.
+/// 2) The `SortPreservingMergeExec` does not impact the final result ordering.
+fn analyze_immediate_spm_removal(
+    requirements: &PlanWithSortRequirements,
+    spm_exec: &SortPreservingMergeExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        spm_exec.input().output_ordering(),
+        Some(spm_exec.expr()),
+        || spm_exec.input().equivalence_properties(),
+    ) && spm_exec.input().output_partitioning().partition_count() <= 1
+    {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: true,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortPreservingMergeExec only
+    else if !requirements.impact_result_ordering {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    } else {
+        None
+    }
+}
+
+/// Determine the children requirements
+/// If the children requirements are more specific, do not push down the parent requirements
+/// If the the parent requirements are more specific, push down the parent requirements
+/// If they are not compatible, need to add Sort.
+fn determine_children_requirement(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    request_child: Option<&[PhysicalSortRequirements]>,
+    child_plan: Arc<dyn ExecutionPlan>,
+) -> RequirementsCompatibility {
+    if requirements_compatible(request_child, parent_required, || {
+        child_plan.equivalence_properties()
+    }) {
+        // request child requirements are more specific, no need to push down the parent requirements
+        RequirementsCompatibility::Satisfy
+    } else if requirements_compatible(parent_required, request_child, || {
+        child_plan.equivalence_properties()
+    }) {
+        // parent requirements are more specific, adjust the request child requirements and push down the new requirements
+        let adjusted = parent_required.map(|r| r.to_vec());
+        RequirementsCompatibility::Compatible(adjusted)
+    } else {
+        RequirementsCompatibility::NonCompatible
+    }
+}
+
+/// Compares window expression's `window_request` and `parent_required_expr` ordering, returns
+/// whether we should reverse the window expression's ordering in order to meet parent's requirements.
+fn check_alignment(
+    input_schema: &SchemaRef,
+    window_request: &PhysicalSortRequirements,
+    parent_required_expr: &PhysicalSortRequirements,
+) -> bool {
+    if parent_required_expr.expr.eq(&window_request.expr)
+        && window_request.sort_options.is_some()
+        && parent_required_expr.sort_options.is_some()
+    {
+        let nullable = parent_required_expr.expr.nullable(input_schema).unwrap();
+        let window_request_opts = window_request.sort_options.unwrap();
+        let parent_required_opts = parent_required_expr.sort_options.unwrap();
+        if nullable {
+            window_request_opts == reverse_sort_options(parent_required_opts)
+        } else {
+            // If the column is not nullable, NULLS FIRST/LAST is not important.
+            window_request_opts.descending != parent_required_opts.descending
+        }
+    } else {
+        false
+    }
+}
+
+fn reverse_window_sort_requirements(
+    request_child: Option<&[PhysicalSortRequirements]>,
+) -> Option<Vec<PhysicalSortRequirements>> {
+    request_child.map(|request| {
+        request
+            .iter()
+            .map(|req| match req.sort_options {
+                None => req.clone(),
+                Some(ops) => PhysicalSortRequirements {
+                    expr: req.expr.clone(),
+                    sort_options: Some(reverse_sort_options(ops)),
+                },
+            })
+            .collect::<Vec<_>>()
+    })
+}
+
+/// Whether to reverse the top WindowExec's sort requirements.
+/// Considering the requirements of the descendants WindowExecs and leaf nodes' output ordering.
+/// TODOļ¼considering all the cases
+fn should_reverse_window_sort_requirements(
+    window_plan: Arc<dyn ExecutionPlan>,
+    top_requirement: Option<&[PhysicalSortRequirements]>,
+    top_reversed_requirement: Option<&[PhysicalSortRequirements]>,
+) -> bool {
+    if top_requirement.is_none() {
+        return false;
+    }
+    let WindowExecInfo { window_expr, .. } =
+        extract_window_info_from_plan(&window_plan).unwrap();
+    let reverse_window_expr = window_expr
+        .iter()
+        .map(|e| e.get_reverse_expr())
+        .collect::<Option<Vec<_>>>();
+    if reverse_window_expr.is_none() {
+        return false;
+    }
+    let flags = window_plan
+        .children()
+        .into_iter()
+        .map(|child| {
+            // If the child is leaf node, check the output ordering
+            if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                false
+            } else if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_reversed_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                true
+            } else if child.as_any().downcast_ref::<WindowAggExec>().is_some()
+                || child
+                    .as_any()
+                    .downcast_ref::<BoundedWindowAggExec>()
+                    .is_some()
+            {
+                // If the child is WindowExec, check the child requirements
+                if requirements_compatible(
+                    top_requirement,
+                    child.required_input_ordering()[0].as_deref(),
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    child.required_input_ordering()[0].as_deref(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    top_reversed_requirement,
+                    child.required_input_ordering()[0].as_deref(),
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    child.required_input_ordering()[0].as_deref(),
+                    top_reversed_requirement,
+                    || child.equivalence_properties(),
+                ) {
+                    should_reverse_window_sort_requirements(
+                        child,
+                        top_requirement,
+                        top_reversed_requirement,
+                    )
+                } else {
+                    requirements_compatible(
+                        top_reversed_requirement,
+                        window_plan.required_input_ordering()[0].as_deref(),
+                        || window_plan.equivalence_properties(),
+                    ) || requirements_compatible(
+                        window_plan.required_input_ordering()[0].as_deref(),
+                        top_reversed_requirement,
+                        || window_plan.equivalence_properties(),
+                    )
+                }
+            } else {
+                requirements_compatible(
+                    top_reversed_requirement,
+                    window_plan.required_input_ordering()[0].as_deref(),
+                    || window_plan.equivalence_properties(),
+                ) || requirements_compatible(
+                    window_plan.required_input_ordering()[0].as_deref(),
+                    top_reversed_requirement,
+                    || window_plan.equivalence_properties(),
+                )
+            }
+        })
+        .collect::<Vec<_>>();
+
+    flags.iter().all(|o| *o)
+}
+
+fn should_reverse_window_exec(
+    required: Option<&[PhysicalSortRequirements]>,
+    request_ordering: Option<&[PhysicalSortRequirements]>,
+    input_schema: &SchemaRef,
+) -> bool {
+    match (required, request_ordering) {
+        (_, None) => false,
+        (None, Some(_)) => false,
+        (Some(required), Some(request_ordering)) => {
+            if required.len() > request_ordering.len() {
+                return false;
+            }
+            let alignment_flags = required
+                .iter()
+                .zip(request_ordering.iter())
+                .filter_map(|(required_expr, request_expr)| {
+                    // Only check the alignment of non-partition columns
+                    if request_expr.sort_options.is_some()
+                        && required_expr.sort_options.is_some()
+                    {
+                        Some(check_alignment(input_schema, request_expr, required_expr))
+                    } else if request_expr.expr.eq(&required_expr.expr) {
+                        None
+                    } else {
+                        Some(false)
+                    }
+                })
+                .collect::<Vec<_>>();
+            if alignment_flags.is_empty() {
+                false
+            } else {
+                alignment_flags.iter().all(|o| *o)
+            }
+        }
+    }
+}
+
+fn extract_window_info_from_plan(
+    plan: &Arc<dyn ExecutionPlan>,
+) -> Option<WindowExecInfo> {

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1447521049

   > FYI @mingmwang, we created an implementation sketch combining bottom-up and top-down approaches, which passes the whole (unified) test set. You can find it [here](https://github.com/synnada-ai/arrow-datafusion/pull/55). We created this so that it can give us a baseline/goal in our quest to achieve full functionality with the simplest design possible.
   > 
   > When you add the global ordering/window expr ordering functionality, we will do one more analysis and comparison with that baseline and give more feedback to you.
   
   Sure, I will take a closer look today and tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1114075188


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()

Review Comment:
   šŸ‘



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112652213


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            // WindowAggExec(BoundedWindowAggExec) might reverse their sort requirements
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let reversed_request_child = reverse_window_sort_requirements(request_child);
+
+            if should_reverse_window_sort_requirements(
+                plan.clone(),
+                request_child,
+                reversed_request_child.as_deref(),
+            ) {
+                let WindowExecInfo {
+                    window_expr,
+                    input_schema,
+                    partition_keys,
+                } = extract_window_info_from_plan(plan).unwrap();
+
+                let new_window_expr = window_expr
+                    .iter()
+                    .map(|e| e.get_reverse_expr())
+                    .collect::<Option<Vec<_>>>();
+                let new_physical_ordering = create_sort_expr_from_requirement(
+                    reversed_request_child.clone().unwrap().as_ref(),
+                );
+                if let Some(window_expr) = new_window_expr {
+                    let uses_bounded_memory =
+                        window_expr.iter().all(|e| e.uses_bounded_memory());
+                    // If all window expressions can run with bounded memory, choose the
+                    // bounded window variant:
+                    let new_plan = if uses_bounded_memory {
+                        Arc::new(BoundedWindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    } else {
+                        Arc::new(WindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    };
+                    return Ok(Some(PlanWithSortRequirements {
+                        plan: new_plan,
+                        impact_result_ordering: false,
+                        satisfy_single_distribution: requirements
+                            .satisfy_single_distribution,
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![reversed_request_child],
+                    }));
+                }
+            }
+        }
+        Ok(Some(PlanWithSortRequirements {
+            required_ordering: None,
+            ..requirements
+        }))
+    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        // If the current plan is a SortExec, modify current SortExec to satisfy the parent requirements
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let mut new_plan = sort_exec.input.clone();
+        add_sort_above(&mut new_plan, parent_required_expr)?;
+        Ok(Some(
+            PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+        ))
+    } else {
+        // Can not satisfy the parent requirements, check whether the requirements can be pushed down. If not, add new SortExec.
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let maintains_input_order = plan.maintains_input_order();
+        // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
+        // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
+        // For UnionExec, we can always push down
+        if (maintains_input_order.is_empty()
+            || !maintains_input_order.iter().any(|o| *o)
+            || plan.as_any().downcast_ref::<RepartitionExec>().is_some()
+            || plan.as_any().downcast_ref::<FilterExec>().is_some()
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some())
+            && plan.as_any().downcast_ref::<UnionExec>().is_none()
+        {
+            let mut new_plan = plan.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+            Ok(Some(
+                PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+            ))
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let child_plan = plan.children()[0].clone();
+            match determine_children_requirement(
+                parent_required,
+                request_child,
+                child_plan,
+            ) {
+                RequirementsCompatibility::Satisfy => Ok(None),
+                RequirementsCompatibility::Compatible(adjusted) => {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![adjusted],
+                        ..requirements
+                    }))
+                }
+                RequirementsCompatibility::NonCompatible => {
+                    let WindowExecInfo {
+                        window_expr,
+                        input_schema,
+                        partition_keys,
+                    } = extract_window_info_from_plan(plan).unwrap();
+                    if should_reverse_window_exec(
+                        parent_required,
+                        request_child,
+                        &input_schema,
+                    ) {
+                        let new_physical_ordering = parent_required_expr.to_vec();
+                        let new_window_expr = window_expr
+                            .iter()
+                            .map(|e| e.get_reverse_expr())
+                            .collect::<Option<Vec<_>>>();
+                        if let Some(window_expr) = new_window_expr {
+                            let uses_bounded_memory =
+                                window_expr.iter().all(|e| e.uses_bounded_memory());
+                            let new_plan = if uses_bounded_memory {
+                                Arc::new(BoundedWindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            } else {
+                                Arc::new(WindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            };
+                            let adjusted_request_ordering =
+                                new_plan.required_input_ordering();
+                            return Ok(Some(PlanWithSortRequirements {
+                                plan: new_plan,
+                                impact_result_ordering: false,
+                                satisfy_single_distribution: requirements
+                                    .satisfy_single_distribution,
+                                required_ordering: None,
+                                adjusted_request_ordering,
+                            }));
+                        }
+                    }
+                    // Can not push down requirements, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
+            // If the current plan is SortMergeJoinExec
+            let left_columns_len = smj.left.schema().fields().len();
+            let expr_source_side =
+                expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
+            match expr_source_side {
+                Some(JoinSide::Left) if maintains_input_order[0] => {
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        parent_required,
+                        parent_required_expr,
+                        JoinSide::Left,
+                    )
+                }
+                Some(JoinSide::Right) if maintains_input_order[1] => {
+                    let new_right_required = match smj.join_type {
+                        JoinType::Inner | JoinType::Right => shift_right_required(
+                            parent_required.unwrap(),
+                            left_columns_len,
+                        )?,
+                        JoinType::RightSemi | JoinType::RightAnti => {
+                            parent_required.unwrap().to_vec()
+                        }
+                        _ => Err(DataFusionError::Plan(
+                            "Unexpected SortMergeJoin type here".to_string(),
+                        ))?,
+                    };
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        Some(new_right_required.deref()),
+                        parent_required_expr,
+                        JoinSide::Right,
+                    )
+                }
+                _ => {
+                    // Can not decide the expr side for SortMergeJoinExec, can not push down, add SortExec;
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if plan.required_input_ordering().iter().any(Option::is_some) {
+            let plan_children = plan.children();
+            let compatible_with_children = izip!(
+                maintains_input_order.iter(),
+                plan.required_input_ordering().into_iter(),
+                plan_children.iter()
+            )
+            .map(|(can_push_down, request_child, child)| {
+                if *can_push_down {
+                    determine_children_requirement(
+                        parent_required,
+                        request_child.as_deref(),
+                        child.clone(),
+                    )
+                } else {
+                    RequirementsCompatibility::NonCompatible
+                }
+            })
+            .collect::<Vec<_>>();
+            if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Satisfy))
+            {
+                // Requirements are satisfied, not need to push down.
+                Ok(None)
+            } else if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Compatible(_)))
+            {
+                // Adjust child requirements and push down the requirements
+                let adjusted = parent_required.map(|r| r.to_vec());
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![adjusted; plan_children.len()],
+                    ..requirements
+                }))
+            } else {
+                // Can not push down, add new SortExec
+                let mut new_plan = plan.clone();
+                add_sort_above(&mut new_plan, parent_required_expr)?;
+                Ok(Some(
+                    PlanWithSortRequirements::new_without_impact_result_ordering(
+                        new_plan,
+                    ),
+                ))
+            }
+        } else {
+            // The current plan does not have its own ordering requirements to its children, consider push down the requirements
+            if let Some(ProjectionExec { expr, .. }) =
+                plan.as_any().downcast_ref::<ProjectionExec>()
+            {
+                // For Projection, we need to transform the requirements to the columns before the Projection
+                // And then to push down the requirements
+                let new_adjusted =
+                    map_requirement_before_projection(parent_required, expr);
+                if new_adjusted.is_some() {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![new_adjusted],
+                        ..requirements
+                    }))
+                } else {
+                    // Can not push down, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            } else {
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![
+                        requirements.required_ordering;
+                        requirements
+                            .adjusted_request_ordering
+                            .len()
+                    ],
+                    ..requirements
+                }))
+            }
+        }
+    }
+}
+
+/// Analyzes a given `Sort` (`plan`) to determine whether the Sort can be removed:
+/// 1) The input already has a finer ordering than this `Sort` enforces.
+/// 2) The `Sort` does not impact the final result ordering.
+fn analyze_immediate_sort_removal(
+    requirements: &PlanWithSortRequirements,
+    sort_exec: &SortExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        sort_exec.input().output_ordering(),
+        sort_exec.output_ordering(),
+        || sort_exec.input().equivalence_properties(),
+    ) {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+            impact_result_ordering: requirements.impact_result_ordering,
+            satisfy_single_distribution: requirements.satisfy_single_distribution,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortExec
+    else if !requirements.impact_result_ordering {
+        if requirements.satisfy_single_distribution
+            && sort_exec.input().output_partitioning().partition_count() > 1
+        {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(CoalescePartitionsExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        } else {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        }
+    } else {
+        None
+    }
+}
+
+/// Analyzes a given `SortPreservingMergeExec` (`plan`) to determine whether the SortPreservingMergeExec can be removed:
+/// 1) The input already has a finer ordering than this `SortPreservingMergeExec` enforces.
+/// 2) The `SortPreservingMergeExec` does not impact the final result ordering.
+fn analyze_immediate_spm_removal(
+    requirements: &PlanWithSortRequirements,
+    spm_exec: &SortPreservingMergeExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        spm_exec.input().output_ordering(),
+        Some(spm_exec.expr()),
+        || spm_exec.input().equivalence_properties(),
+    ) && spm_exec.input().output_partitioning().partition_count() <= 1
+    {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: true,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortPreservingMergeExec only
+    else if !requirements.impact_result_ordering {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    } else {
+        None
+    }
+}
+
+/// Determine the children requirements
+/// If the children requirements are more specific, do not push down the parent requirements
+/// If the the parent requirements are more specific, push down the parent requirements
+/// If they are not compatible, need to add Sort.
+fn determine_children_requirement(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    request_child: Option<&[PhysicalSortRequirements]>,
+    child_plan: Arc<dyn ExecutionPlan>,
+) -> RequirementsCompatibility {
+    if requirements_compatible(request_child, parent_required, || {
+        child_plan.equivalence_properties()
+    }) {
+        // request child requirements are more specific, no need to push down the parent requirements
+        RequirementsCompatibility::Satisfy
+    } else if requirements_compatible(parent_required, request_child, || {
+        child_plan.equivalence_properties()
+    }) {
+        // parent requirements are more specific, adjust the request child requirements and push down the new requirements
+        let adjusted = parent_required.map(|r| r.to_vec());
+        RequirementsCompatibility::Compatible(adjusted)
+    } else {
+        RequirementsCompatibility::NonCompatible
+    }
+}
+
+/// Compares window expression's `window_request` and `parent_required_expr` ordering, returns
+/// whether we should reverse the window expression's ordering in order to meet parent's requirements.
+fn check_alignment(
+    input_schema: &SchemaRef,
+    window_request: &PhysicalSortRequirements,
+    parent_required_expr: &PhysicalSortRequirements,
+) -> bool {
+    if parent_required_expr.expr.eq(&window_request.expr)
+        && window_request.sort_options.is_some()
+        && parent_required_expr.sort_options.is_some()
+    {
+        let nullable = parent_required_expr.expr.nullable(input_schema).unwrap();
+        let window_request_opts = window_request.sort_options.unwrap();
+        let parent_required_opts = parent_required_expr.sort_options.unwrap();
+        if nullable {
+            window_request_opts == reverse_sort_options(parent_required_opts)
+        } else {
+            // If the column is not nullable, NULLS FIRST/LAST is not important.
+            window_request_opts.descending != parent_required_opts.descending
+        }
+    } else {
+        false
+    }
+}
+
+fn reverse_window_sort_requirements(
+    request_child: Option<&[PhysicalSortRequirements]>,
+) -> Option<Vec<PhysicalSortRequirements>> {
+    request_child.map(|request| {
+        request
+            .iter()
+            .map(|req| match req.sort_options {
+                None => req.clone(),
+                Some(ops) => PhysicalSortRequirements {
+                    expr: req.expr.clone(),
+                    sort_options: Some(reverse_sort_options(ops)),
+                },
+            })
+            .collect::<Vec<_>>()
+    })
+}
+
+/// Whether to reverse the top WindowExec's sort requirements.
+/// Considering the requirements of the descendants WindowExecs and leaf nodes' output ordering.
+/// TODOļ¼considering all the cases
+fn should_reverse_window_sort_requirements(
+    window_plan: Arc<dyn ExecutionPlan>,
+    top_requirement: Option<&[PhysicalSortRequirements]>,
+    top_reversed_requirement: Option<&[PhysicalSortRequirements]>,
+) -> bool {
+    if top_requirement.is_none() {
+        return false;
+    }
+    let WindowExecInfo { window_expr, .. } =
+        extract_window_info_from_plan(&window_plan).unwrap();
+    let reverse_window_expr = window_expr
+        .iter()
+        .map(|e| e.get_reverse_expr())
+        .collect::<Option<Vec<_>>>();
+    if reverse_window_expr.is_none() {
+        return false;
+    }
+    let flags = window_plan
+        .children()
+        .into_iter()
+        .map(|child| {
+            // If the child is leaf node, check the output ordering
+            if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                false
+            } else if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_reversed_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                true
+            } else if child.as_any().downcast_ref::<WindowAggExec>().is_some()
+                || child
+                    .as_any()
+                    .downcast_ref::<BoundedWindowAggExec>()
+                    .is_some()
+            {
+                // If the child is WindowExec, check the child requirements
+                if requirements_compatible(
+                    top_requirement,
+                    child.required_input_ordering()[0].as_deref(),
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    child.required_input_ordering()[0].as_deref(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(

Review Comment:
   This pattern is used a lot. I think you can construct a util to check whether one of the requirements satisfy other (where function does this check). That would be better I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1502800054

   Just close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1114107508


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -235,6 +268,186 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
     }
 }
 
+/// Checks whether the required ordering requirements are satisfied by the provided [PhysicalSortExpr]s.
+pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortExpr]>,
+    required: Option<&[PhysicalSortRequirements]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            ordering_satisfy_requirement_concrete(provided, required, equal_properties)
+        }
+    }
+}
+
+pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalenceProperties>(
+    provided: &[PhysicalSortExpr],
+    required: &[PhysicalSortRequirements],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(order1, order2)| order2.satisfy(order1))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        let normalized_requirements = required
+            .iter()
+            .map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            })
+            .collect::<Vec<_>>();
+        let normalized_provided_exprs = provided
+            .iter()
+            .map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+            })
+            .collect::<Vec<_>>();
+        normalized_requirements
+            .iter()
+            .zip(normalized_provided_exprs.iter())
+            .all(|(order1, order2)| order2.satisfy(order1))
+    } else {
+        false
+    }
+}
+
+/// Provided requirements are compatible with the required, which means the provided requirements are equal or more specific than the required
+pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortRequirements]>,
+    required: Option<&[PhysicalSortRequirements]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            if required.len() > provided.len() {
+                false
+            } else if required
+                .iter()
+                .zip(provided.iter())
+                .all(|(req, pro)| pro.compatible(req))
+            {
+                true
+            } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+                let normalized_required = required
+                    .iter()
+                    .map(|e| {
+                        normalize_sort_requirement_with_equivalence_properties(
+                            e.clone(),
+                            eq_classes,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                let normalized_provided = provided
+                    .iter()
+                    .map(|e| {
+                        normalize_sort_requirement_with_equivalence_properties(
+                            e.clone(),
+                            eq_classes,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                normalized_required
+                    .iter()
+                    .zip(normalized_provided.iter())
+                    .all(|(req, pro)| pro.compatible(req))
+            } else {
+                false
+            }
+        }
+    }
+}
+
+pub fn map_columns_before_projection(
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+    let mut column_mapping = HashMap::new();
+    for (expression, name) in proj_exprs.iter() {
+        if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+            column_mapping.insert(name.clone(), column.clone());
+        };
+    }
+    let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
+        .iter()
+        .filter_map(|r| {
+            if let Some(column) = r.as_any().downcast_ref::<Column>() {
+                column_mapping.get(column.name())
+            } else {
+                None
+            }
+        })
+        .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
+        .collect::<Vec<_>>();
+    new_required
+}
+
+pub fn map_requirement_before_projection(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Option<Vec<PhysicalSortRequirements>> {
+    if let Some(requirement) = parent_required {
+        let required_expr = create_sort_expr_from_requirement(requirement)
+            .iter()
+            .map(|sort_expr| sort_expr.expr.clone())
+            .collect::<Vec<_>>();
+        let new_exprs = map_columns_before_projection(&required_expr, proj_exprs);
+        if new_exprs.len() == requirement.len() {
+            let new_request = new_exprs
+                .iter()
+                .zip(requirement.iter())
+                .map(|(new, old)| PhysicalSortRequirements {
+                    expr: new.clone(),
+                    sort_options: old.sort_options,
+                })
+                .collect::<Vec<_>>();
+            Some(new_request)
+        } else {
+            None
+        }
+    } else {
+        None
+    }
+}
+
+pub fn create_sort_expr_from_requirement(
+    required: &[PhysicalSortRequirements],
+) -> Vec<PhysicalSortExpr> {
+    let parent_required_expr = required
+        .iter()
+        .map(|prop| {
+            if prop.sort_options.is_some() {
+                PhysicalSortExpr {
+                    expr: prop.expr.clone(),
+                    options: prop.sort_options.unwrap(),
+                }
+            } else {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112611424


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();

Review Comment:
   Instead of this pattern you can use `plan.as_any().is::<LocalLimitExec>()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1441256555

   @mustafasrepo @ozankabak 
   After a closer look at the test comparing result, I think the differences mainly relate to preserve ordering(`maintains_input_order`). I had highlighted the two test cases. 
   From my point of view,  the rules should not remove those Sorts, the rules should respect the `maintains_input_order()` and decide what sorts can be removed or not, even sometimes those Sort are better than necessary.
   
   And In the TopDown rule implementation, I can add another configuration flag to enable removing Sorts more aggressively and achieve the same results as the the Bottom-up rule, but we still need to define clearly in which cases Sort can be removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112654984


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            // WindowAggExec(BoundedWindowAggExec) might reverse their sort requirements
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let reversed_request_child = reverse_window_sort_requirements(request_child);
+
+            if should_reverse_window_sort_requirements(
+                plan.clone(),
+                request_child,
+                reversed_request_child.as_deref(),
+            ) {
+                let WindowExecInfo {
+                    window_expr,
+                    input_schema,
+                    partition_keys,
+                } = extract_window_info_from_plan(plan).unwrap();
+
+                let new_window_expr = window_expr
+                    .iter()
+                    .map(|e| e.get_reverse_expr())
+                    .collect::<Option<Vec<_>>>();
+                let new_physical_ordering = create_sort_expr_from_requirement(
+                    reversed_request_child.clone().unwrap().as_ref(),
+                );
+                if let Some(window_expr) = new_window_expr {
+                    let uses_bounded_memory =
+                        window_expr.iter().all(|e| e.uses_bounded_memory());
+                    // If all window expressions can run with bounded memory, choose the
+                    // bounded window variant:
+                    let new_plan = if uses_bounded_memory {
+                        Arc::new(BoundedWindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    } else {
+                        Arc::new(WindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    };
+                    return Ok(Some(PlanWithSortRequirements {
+                        plan: new_plan,
+                        impact_result_ordering: false,
+                        satisfy_single_distribution: requirements
+                            .satisfy_single_distribution,
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![reversed_request_child],
+                    }));
+                }
+            }
+        }
+        Ok(Some(PlanWithSortRequirements {
+            required_ordering: None,
+            ..requirements
+        }))
+    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        // If the current plan is a SortExec, modify current SortExec to satisfy the parent requirements
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let mut new_plan = sort_exec.input.clone();
+        add_sort_above(&mut new_plan, parent_required_expr)?;
+        Ok(Some(
+            PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+        ))
+    } else {
+        // Can not satisfy the parent requirements, check whether the requirements can be pushed down. If not, add new SortExec.
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let maintains_input_order = plan.maintains_input_order();
+        // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
+        // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
+        // For UnionExec, we can always push down
+        if (maintains_input_order.is_empty()
+            || !maintains_input_order.iter().any(|o| *o)
+            || plan.as_any().downcast_ref::<RepartitionExec>().is_some()
+            || plan.as_any().downcast_ref::<FilterExec>().is_some()
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some())
+            && plan.as_any().downcast_ref::<UnionExec>().is_none()
+        {
+            let mut new_plan = plan.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+            Ok(Some(
+                PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+            ))
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let child_plan = plan.children()[0].clone();
+            match determine_children_requirement(
+                parent_required,
+                request_child,
+                child_plan,
+            ) {
+                RequirementsCompatibility::Satisfy => Ok(None),
+                RequirementsCompatibility::Compatible(adjusted) => {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![adjusted],
+                        ..requirements
+                    }))
+                }
+                RequirementsCompatibility::NonCompatible => {
+                    let WindowExecInfo {
+                        window_expr,
+                        input_schema,
+                        partition_keys,
+                    } = extract_window_info_from_plan(plan).unwrap();
+                    if should_reverse_window_exec(
+                        parent_required,
+                        request_child,
+                        &input_schema,
+                    ) {
+                        let new_physical_ordering = parent_required_expr.to_vec();
+                        let new_window_expr = window_expr
+                            .iter()
+                            .map(|e| e.get_reverse_expr())
+                            .collect::<Option<Vec<_>>>();
+                        if let Some(window_expr) = new_window_expr {
+                            let uses_bounded_memory =
+                                window_expr.iter().all(|e| e.uses_bounded_memory());
+                            let new_plan = if uses_bounded_memory {
+                                Arc::new(BoundedWindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            } else {
+                                Arc::new(WindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            };
+                            let adjusted_request_ordering =
+                                new_plan.required_input_ordering();
+                            return Ok(Some(PlanWithSortRequirements {
+                                plan: new_plan,
+                                impact_result_ordering: false,
+                                satisfy_single_distribution: requirements
+                                    .satisfy_single_distribution,
+                                required_ordering: None,
+                                adjusted_request_ordering,
+                            }));
+                        }
+                    }
+                    // Can not push down requirements, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
+            // If the current plan is SortMergeJoinExec
+            let left_columns_len = smj.left.schema().fields().len();
+            let expr_source_side =
+                expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
+            match expr_source_side {
+                Some(JoinSide::Left) if maintains_input_order[0] => {
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        parent_required,
+                        parent_required_expr,
+                        JoinSide::Left,
+                    )
+                }
+                Some(JoinSide::Right) if maintains_input_order[1] => {
+                    let new_right_required = match smj.join_type {
+                        JoinType::Inner | JoinType::Right => shift_right_required(
+                            parent_required.unwrap(),
+                            left_columns_len,
+                        )?,
+                        JoinType::RightSemi | JoinType::RightAnti => {
+                            parent_required.unwrap().to_vec()
+                        }
+                        _ => Err(DataFusionError::Plan(
+                            "Unexpected SortMergeJoin type here".to_string(),
+                        ))?,
+                    };
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        Some(new_right_required.deref()),
+                        parent_required_expr,
+                        JoinSide::Right,
+                    )
+                }
+                _ => {
+                    // Can not decide the expr side for SortMergeJoinExec, can not push down, add SortExec;
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if plan.required_input_ordering().iter().any(Option::is_some) {
+            let plan_children = plan.children();
+            let compatible_with_children = izip!(
+                maintains_input_order.iter(),
+                plan.required_input_ordering().into_iter(),
+                plan_children.iter()
+            )
+            .map(|(can_push_down, request_child, child)| {
+                if *can_push_down {
+                    determine_children_requirement(
+                        parent_required,
+                        request_child.as_deref(),
+                        child.clone(),
+                    )
+                } else {
+                    RequirementsCompatibility::NonCompatible
+                }
+            })
+            .collect::<Vec<_>>();
+            if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Satisfy))
+            {
+                // Requirements are satisfied, not need to push down.
+                Ok(None)
+            } else if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Compatible(_)))
+            {
+                // Adjust child requirements and push down the requirements
+                let adjusted = parent_required.map(|r| r.to_vec());
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![adjusted; plan_children.len()],
+                    ..requirements
+                }))
+            } else {
+                // Can not push down, add new SortExec
+                let mut new_plan = plan.clone();
+                add_sort_above(&mut new_plan, parent_required_expr)?;
+                Ok(Some(
+                    PlanWithSortRequirements::new_without_impact_result_ordering(
+                        new_plan,
+                    ),
+                ))
+            }
+        } else {
+            // The current plan does not have its own ordering requirements to its children, consider push down the requirements
+            if let Some(ProjectionExec { expr, .. }) =
+                plan.as_any().downcast_ref::<ProjectionExec>()
+            {
+                // For Projection, we need to transform the requirements to the columns before the Projection
+                // And then to push down the requirements
+                let new_adjusted =
+                    map_requirement_before_projection(parent_required, expr);
+                if new_adjusted.is_some() {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![new_adjusted],
+                        ..requirements
+                    }))
+                } else {
+                    // Can not push down, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            } else {
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![
+                        requirements.required_ordering;
+                        requirements
+                            .adjusted_request_ordering
+                            .len()
+                    ],
+                    ..requirements
+                }))
+            }
+        }
+    }
+}
+
+/// Analyzes a given `Sort` (`plan`) to determine whether the Sort can be removed:
+/// 1) The input already has a finer ordering than this `Sort` enforces.
+/// 2) The `Sort` does not impact the final result ordering.
+fn analyze_immediate_sort_removal(
+    requirements: &PlanWithSortRequirements,
+    sort_exec: &SortExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        sort_exec.input().output_ordering(),
+        sort_exec.output_ordering(),
+        || sort_exec.input().equivalence_properties(),
+    ) {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+            impact_result_ordering: requirements.impact_result_ordering,
+            satisfy_single_distribution: requirements.satisfy_single_distribution,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortExec
+    else if !requirements.impact_result_ordering {
+        if requirements.satisfy_single_distribution
+            && sort_exec.input().output_partitioning().partition_count() > 1
+        {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(CoalescePartitionsExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        } else {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        }
+    } else {
+        None
+    }
+}
+
+/// Analyzes a given `SortPreservingMergeExec` (`plan`) to determine whether the SortPreservingMergeExec can be removed:
+/// 1) The input already has a finer ordering than this `SortPreservingMergeExec` enforces.
+/// 2) The `SortPreservingMergeExec` does not impact the final result ordering.
+fn analyze_immediate_spm_removal(
+    requirements: &PlanWithSortRequirements,
+    spm_exec: &SortPreservingMergeExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        spm_exec.input().output_ordering(),
+        Some(spm_exec.expr()),
+        || spm_exec.input().equivalence_properties(),
+    ) && spm_exec.input().output_partitioning().partition_count() <= 1
+    {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: true,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortPreservingMergeExec only
+    else if !requirements.impact_result_ordering {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    } else {
+        None
+    }
+}
+
+/// Determine the children requirements
+/// If the children requirements are more specific, do not push down the parent requirements
+/// If the the parent requirements are more specific, push down the parent requirements
+/// If they are not compatible, need to add Sort.
+fn determine_children_requirement(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    request_child: Option<&[PhysicalSortRequirements]>,
+    child_plan: Arc<dyn ExecutionPlan>,
+) -> RequirementsCompatibility {
+    if requirements_compatible(request_child, parent_required, || {
+        child_plan.equivalence_properties()
+    }) {
+        // request child requirements are more specific, no need to push down the parent requirements
+        RequirementsCompatibility::Satisfy
+    } else if requirements_compatible(parent_required, request_child, || {
+        child_plan.equivalence_properties()
+    }) {
+        // parent requirements are more specific, adjust the request child requirements and push down the new requirements
+        let adjusted = parent_required.map(|r| r.to_vec());
+        RequirementsCompatibility::Compatible(adjusted)
+    } else {
+        RequirementsCompatibility::NonCompatible
+    }
+}
+
+/// Compares window expression's `window_request` and `parent_required_expr` ordering, returns
+/// whether we should reverse the window expression's ordering in order to meet parent's requirements.
+fn check_alignment(
+    input_schema: &SchemaRef,
+    window_request: &PhysicalSortRequirements,
+    parent_required_expr: &PhysicalSortRequirements,
+) -> bool {
+    if parent_required_expr.expr.eq(&window_request.expr)
+        && window_request.sort_options.is_some()
+        && parent_required_expr.sort_options.is_some()
+    {
+        let nullable = parent_required_expr.expr.nullable(input_schema).unwrap();
+        let window_request_opts = window_request.sort_options.unwrap();
+        let parent_required_opts = parent_required_expr.sort_options.unwrap();
+        if nullable {
+            window_request_opts == reverse_sort_options(parent_required_opts)
+        } else {
+            // If the column is not nullable, NULLS FIRST/LAST is not important.
+            window_request_opts.descending != parent_required_opts.descending
+        }
+    } else {
+        false
+    }
+}
+
+fn reverse_window_sort_requirements(
+    request_child: Option<&[PhysicalSortRequirements]>,
+) -> Option<Vec<PhysicalSortRequirements>> {
+    request_child.map(|request| {
+        request
+            .iter()
+            .map(|req| match req.sort_options {
+                None => req.clone(),
+                Some(ops) => PhysicalSortRequirements {
+                    expr: req.expr.clone(),
+                    sort_options: Some(reverse_sort_options(ops)),
+                },
+            })
+            .collect::<Vec<_>>()
+    })
+}
+
+/// Whether to reverse the top WindowExec's sort requirements.
+/// Considering the requirements of the descendants WindowExecs and leaf nodes' output ordering.
+/// TODOļ¼considering all the cases
+fn should_reverse_window_sort_requirements(
+    window_plan: Arc<dyn ExecutionPlan>,
+    top_requirement: Option<&[PhysicalSortRequirements]>,
+    top_reversed_requirement: Option<&[PhysicalSortRequirements]>,
+) -> bool {
+    if top_requirement.is_none() {
+        return false;
+    }
+    let WindowExecInfo { window_expr, .. } =
+        extract_window_info_from_plan(&window_plan).unwrap();
+    let reverse_window_expr = window_expr
+        .iter()
+        .map(|e| e.get_reverse_expr())
+        .collect::<Option<Vec<_>>>();
+    if reverse_window_expr.is_none() {
+        return false;
+    }
+    let flags = window_plan
+        .children()
+        .into_iter()
+        .map(|child| {
+            // If the child is leaf node, check the output ordering
+            if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                false
+            } else if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_reversed_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                true
+            } else if child.as_any().downcast_ref::<WindowAggExec>().is_some()
+                || child
+                    .as_any()
+                    .downcast_ref::<BoundedWindowAggExec>()
+                    .is_some()
+            {
+                // If the child is WindowExec, check the child requirements
+                if requirements_compatible(
+                    top_requirement,
+                    child.required_input_ordering()[0].as_deref(),
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    child.required_input_ordering()[0].as_deref(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    top_reversed_requirement,
+                    child.required_input_ordering()[0].as_deref(),
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    child.required_input_ordering()[0].as_deref(),
+                    top_reversed_requirement,
+                    || child.equivalence_properties(),
+                ) {
+                    should_reverse_window_sort_requirements(
+                        child,
+                        top_requirement,
+                        top_reversed_requirement,
+                    )
+                } else {
+                    requirements_compatible(
+                        top_reversed_requirement,
+                        window_plan.required_input_ordering()[0].as_deref(),
+                        || window_plan.equivalence_properties(),
+                    ) || requirements_compatible(
+                        window_plan.required_input_ordering()[0].as_deref(),
+                        top_reversed_requirement,
+                        || window_plan.equivalence_properties(),
+                    )
+                }
+            } else {
+                requirements_compatible(
+                    top_reversed_requirement,
+                    window_plan.required_input_ordering()[0].as_deref(),
+                    || window_plan.equivalence_properties(),
+                ) || requirements_compatible(
+                    window_plan.required_input_ordering()[0].as_deref(),
+                    top_reversed_requirement,
+                    || window_plan.equivalence_properties(),
+                )
+            }
+        })
+        .collect::<Vec<_>>();
+
+    flags.iter().all(|o| *o)
+}
+
+fn should_reverse_window_exec(
+    required: Option<&[PhysicalSortRequirements]>,
+    request_ordering: Option<&[PhysicalSortRequirements]>,
+    input_schema: &SchemaRef,
+) -> bool {
+    match (required, request_ordering) {
+        (_, None) => false,
+        (None, Some(_)) => false,
+        (Some(required), Some(request_ordering)) => {
+            if required.len() > request_ordering.len() {
+                return false;
+            }
+            let alignment_flags = required
+                .iter()
+                .zip(request_ordering.iter())
+                .filter_map(|(required_expr, request_expr)| {
+                    // Only check the alignment of non-partition columns
+                    if request_expr.sort_options.is_some()
+                        && required_expr.sort_options.is_some()
+                    {
+                        Some(check_alignment(input_schema, request_expr, required_expr))
+                    } else if request_expr.expr.eq(&required_expr.expr) {
+                        None
+                    } else {
+                        Some(false)
+                    }
+                })
+                .collect::<Vec<_>>();
+            if alignment_flags.is_empty() {
+                false
+            } else {
+                alignment_flags.iter().all(|o| *o)
+            }
+        }
+    }
+}
+
+fn extract_window_info_from_plan(
+    plan: &Arc<dyn ExecutionPlan>,
+) -> Option<WindowExecInfo> {

Review Comment:
   I think this function can return `Result<WindowExecInfo>`. This change would remove some the `unwrap`s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112635587


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)

Review Comment:
   Instead of `SortPreservingMergeExec + SortExec(local/global)`, you can use `SortExec(local/global) -> SortPreservingMergeExec`. Second reflects hierarchy better. I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1433727056

   We started the work to get the two approaches to a comparable state, @mustafasrepo is actively working on it. We will post more updates as we make progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1432486349

   Regarding the WindowExec/Window expression reverse,  do we support the reverse for all the built-in window functions?
   For example for `ROW_NUMBER() OVER `, I think we should not allow the reverse, but I do not find a place to check or defining the allowed function list.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112608135


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -235,6 +268,186 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
     }
 }
 
+/// Checks whether the required ordering requirements are satisfied by the provided [PhysicalSortExpr]s.
+pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortExpr]>,
+    required: Option<&[PhysicalSortRequirements]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            ordering_satisfy_requirement_concrete(provided, required, equal_properties)
+        }
+    }
+}
+
+pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalenceProperties>(
+    provided: &[PhysicalSortExpr],
+    required: &[PhysicalSortRequirements],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(order1, order2)| order2.satisfy(order1))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        let normalized_requirements = required
+            .iter()
+            .map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            })
+            .collect::<Vec<_>>();
+        let normalized_provided_exprs = provided
+            .iter()
+            .map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+            })
+            .collect::<Vec<_>>();
+        normalized_requirements
+            .iter()
+            .zip(normalized_provided_exprs.iter())
+            .all(|(order1, order2)| order2.satisfy(order1))
+    } else {
+        false
+    }
+}
+
+/// Provided requirements are compatible with the required, which means the provided requirements are equal or more specific than the required
+pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortRequirements]>,
+    required: Option<&[PhysicalSortRequirements]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            if required.len() > provided.len() {
+                false
+            } else if required
+                .iter()
+                .zip(provided.iter())
+                .all(|(req, pro)| pro.compatible(req))
+            {
+                true
+            } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+                let normalized_required = required
+                    .iter()
+                    .map(|e| {
+                        normalize_sort_requirement_with_equivalence_properties(
+                            e.clone(),
+                            eq_classes,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                let normalized_provided = provided
+                    .iter()
+                    .map(|e| {
+                        normalize_sort_requirement_with_equivalence_properties(
+                            e.clone(),
+                            eq_classes,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                normalized_required
+                    .iter()
+                    .zip(normalized_provided.iter())
+                    .all(|(req, pro)| pro.compatible(req))
+            } else {
+                false
+            }
+        }
+    }
+}
+
+pub fn map_columns_before_projection(
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+    let mut column_mapping = HashMap::new();
+    for (expression, name) in proj_exprs.iter() {
+        if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+            column_mapping.insert(name.clone(), column.clone());
+        };
+    }
+    let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
+        .iter()
+        .filter_map(|r| {
+            if let Some(column) = r.as_any().downcast_ref::<Column>() {
+                column_mapping.get(column.name())
+            } else {
+                None
+            }
+        })
+        .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
+        .collect::<Vec<_>>();
+    new_required
+}
+
+pub fn map_requirement_before_projection(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Option<Vec<PhysicalSortRequirements>> {
+    if let Some(requirement) = parent_required {
+        let required_expr = create_sort_expr_from_requirement(requirement)
+            .iter()
+            .map(|sort_expr| sort_expr.expr.clone())
+            .collect::<Vec<_>>();
+        let new_exprs = map_columns_before_projection(&required_expr, proj_exprs);
+        if new_exprs.len() == requirement.len() {
+            let new_request = new_exprs
+                .iter()
+                .zip(requirement.iter())
+                .map(|(new, old)| PhysicalSortRequirements {
+                    expr: new.clone(),
+                    sort_options: old.sort_options,
+                })
+                .collect::<Vec<_>>();
+            Some(new_request)
+        } else {
+            None
+        }
+    } else {
+        None
+    }
+}
+
+pub fn create_sort_expr_from_requirement(
+    required: &[PhysicalSortRequirements],
+) -> Vec<PhysicalSortExpr> {
+    let parent_required_expr = required
+        .iter()
+        .map(|prop| {
+            if prop.sort_options.is_some() {
+                PhysicalSortExpr {
+                    expr: prop.expr.clone(),
+                    options: prop.sort_options.unwrap(),
+                }
+            } else {

Review Comment:
   I think we can use `If let Some(sort_options) = prop.sort_options`  idiom here. This would remove `.unwrap()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112638030


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {

Review Comment:
   You can write a util function `is_window` to check for whether executor is `WindowAggExec` or `BoundedWindowAggExec`. There are couple of place this check is done. I think it will be handy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1431673538

   @mustafasrepo @ozankabak @yahoNanJing 
   Please help to take a look.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1432533070

   Not all built-in window functions are reversible. There is an indicator in the API called `get_reverse_expr` in the `WindowExpr` trait, which returns `None` if there is no equivalent reverse. For built-ins, this function calls `reverse_expr`, whose default value is `None`. Functions like `LEAD` and `LAG` override this to indicate reversibility, but `ROW_NUMBER` doesn't.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112657573


##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -69,4 +69,56 @@ impl PhysicalSortExpr {
             options: Some(self.options),
         })
     }
+
+    pub fn satisfy(&self, requirement: &PhysicalSortRequirements) -> bool {
+        if requirement.sort_options.is_some() {
+            self.options == requirement.sort_options.unwrap()
+                && self.expr.eq(&requirement.expr)

Review Comment:
   You can use `If let Some()` idiom here. this would remove `.unwrap()` in the body.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1114074800


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang closed pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang closed pull request #5290: TopDown EnforceSorting implementation
URL: https://github.com/apache/arrow-datafusion/pull/5290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1454871370

   > What is the status of this PR (it looks like it may be waiting to address some more feedback). Shall we mark it as "draft" as we work on it (I am trying to make sure all PRs marked as "ready for review" are actually waiting on review)
   
   Sounds good to me, we will let you know when we converge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1454861165

   What is the status of this PR (it looks like it may be waiting to address some more feedback). Shall we mark it as "draft" as we work on it (I am trying to make sure all PRs marked as "ready for review" are actually waiting on review)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1431854949

   Thank you, we will digest this in the next several days and leave comments as we make progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1432479006

   > The PR is very large in scope. It changes parts of the old code (and certainly makes some changes to its tests), and also adds new code (and new tests). It would be much easier to review this if it were broken down to two PRs, where the first one only replicates the current functionality, has no functionality regressions, and does not change any tests at all; with the second PR adding new functionality. Right now, the new rule is significantly longer than the old rule (which is bad), but it offers more functionality (which is great). So is switching from bottom-up to top-down a good change or a bad change? We can't tell easily.
   > 
   > Now, let me share my (very) preliminary impression so far after a cursory look: I see that it has better handling of sort preserving merges, smarter push-down of sorts under unions, and adds support for sort merge joins. These are the good bits. The cons are that it seems to lose partition awareness (though I'm not sure about this yet) and it seems to regress on some cases where it was doing better before. I think at least some of these are due to the presumption that there is a global output ordering to preserve, and I am not sure I agree with that.
   > 
   > Anyway, we will disentangle and review in detail, but I want to give you a heads up that this will take some time. We will need to analyze every case carefully, go back to the old version of the code (and tests), compare and contrast etc. Before we form an idea on the merits of bottom-up vs. top-down, our goal will be to create _two functionally equal_ implementations passing _exactly the same_ test suite. Without that, it is not possible to objectively decide.
   > 
   > Whatever the result on bottom-up vs. top-down is, I think this exercise will end up making the rule better, so that's great šŸš€ I will keep you posted as we make progress in the upcoming days.
   
   Sure, please take your time and I will add more comments to the code to explain the rule process.  
   The new rule looks significantly longer than the original one is because of handling the propagating of sort requirements down.  But I think the sort removing/adding procedure is very clear and the property/requirement driven framework is more powerful.  It can effectively handling below case and figure out an optimal plan without adding Sort and then removing Sort back and forth.
   
   ```
   Required('a', 'b', 'c')
      Required('a', 'b')
         Required('a')
   ```
   
   We can leverage the same framework to do more advanced optimizations like re-ordering(PostgreSQL has this optimization) the multiple window execs in the plan tree and further reduce the number Sort. Generally I think the top-down based approach is more easy and straightforward to collect and propagate necessary properties and find the global optimal plan.
   
   ```
   Required/Order by ('a', 'b', 'c')
      WindowExec1 Required('a', 'b', 'c')
         WindowExec2 Required('x', 'y', 'z')
             Order by ('x', 'y', 'z')
   ```
   
   
   Some UT results are changed. Yes, I think the major arguing point is whether we should preserve output ordering during optimization process or we can trim the unnecessary sort columns.  
   As I know, `SparkSQL` preserves the output ordering(SparkSQL does not do very sophisticated sort optimizations), `PostgreSQL` sometimes preserves the output ordering but sometime not(I guess this is decided by the top/parent operators, if they are ordering sensitive, but I'm not sure).
   For DataFusion, my preference is since we alway define the `maintains_input_order()` method for physical plan nodes, if it is true, we should preserve output ordering and should not trim or reverse output ordering, otherwise `maintains_input_order()` is meaningless and very confusing.
   
   There are some other UT result changes, I am not sure whether they are due to the original bug or the new rule introduced regression, need to double confirm with you and check carefully, especially this one `test_window_agg_complex_plan`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1438268537

   @mingmwang I went through this PR in detail. Thanks for the great work. Let me summarize my observations. 
   
   The Top-Down approach is better at pushing down `SortExec` s if it is helpful. However, the Bottom-up approach is better at producing pipeline-friendly plans when it is possible. 
   
   Since some tests improve and some regress with this change. I have constructed a unified test bench for these rules (I use union of the tests for `SortEnforcement` and `TopDownSortEnforcement`). The test bench can be found in the [PR](https://github.com/synnada-ai/arrow-datafusion/pull/51). PR body also includes the performance comparison of both rules on the unified test bench. I think we can reach to a rule where all tests in the test bench pass. I will try to do so myself. I encourage you to try it also. 
   
   In the meantime, This PR has nice changes that do not need to wait for final rule version. Specifically, printing global flag in `SortExec` and api change 
   from `fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>>` 
   to `fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirements>>>` don't need to wait. I think you can file another PR with these changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112626187


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()

Review Comment:
   Comment above applies here. Also you can construct a util function `is_limit` to check for Exec is `GlobalLimitExec ` or `LocalLimitExec`. This util can be used for the check at the `PlanWithSortRequirements::init`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1118394012


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2129,16 +2131,17 @@ async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()>
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
-            "        SortExec: expr=[c9@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
-            "              CoalesceBatchesExec: target_batch_size=8192",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  SortExec: expr=[c1@0 ASC NULLS LAST], global=false",

Review Comment:
   Agree with you. It is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1446859911

   FYI @mingmwang, we created an implementation sketch combining bottom-up and top-down approaches, which passes the whole (unified) test set. You can find it [here](https://github.com/synnada-ai/arrow-datafusion/pull/55). We created this so that it can give us a baseline/goal in our quest to achieve full functionality with the simplest design possible.
   
   When you add the global ordering/window expr ordering functionality, we will do one more analysis and comparison with that baseline and give more feedback to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1118395496


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2129,16 +2131,17 @@ async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()>
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
-            "        SortExec: expr=[c9@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
-            "              CoalesceBatchesExec: target_batch_size=8192",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  SortExec: expr=[c1@0 ASC NULLS LAST], global=false",

Review Comment:
   There is still some optimization room in the TopDown rule regarding the window expr ordering and global ordering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1465039603

   @mingmwang, it seems you are busy these days. I think it might be a good idea to create a PR to get the new/extended test suite and a base (passing) implementation in.
   
   Concurrently, we can continue collaborating on this PR to arrive at a more elegant design that passes the new/extended test suite and replace the base implementation. Sounds good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1432889765

   Some future work:
   -  Support reordering multiple window expressions.
   -  required_input_ordering() should remove the duplicate sort keys and remove the equal columns in the same EquivalenceProperties
   -  Introducing FD(functional dependencies) to further avoid the unnecessary Sorts and Repartitions
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112646934


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            // WindowAggExec(BoundedWindowAggExec) might reverse their sort requirements
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let reversed_request_child = reverse_window_sort_requirements(request_child);
+
+            if should_reverse_window_sort_requirements(
+                plan.clone(),
+                request_child,
+                reversed_request_child.as_deref(),
+            ) {
+                let WindowExecInfo {
+                    window_expr,
+                    input_schema,
+                    partition_keys,
+                } = extract_window_info_from_plan(plan).unwrap();
+
+                let new_window_expr = window_expr
+                    .iter()
+                    .map(|e| e.get_reverse_expr())
+                    .collect::<Option<Vec<_>>>();
+                let new_physical_ordering = create_sort_expr_from_requirement(
+                    reversed_request_child.clone().unwrap().as_ref(),
+                );
+                if let Some(window_expr) = new_window_expr {
+                    let uses_bounded_memory =
+                        window_expr.iter().all(|e| e.uses_bounded_memory());
+                    // If all window expressions can run with bounded memory, choose the
+                    // bounded window variant:
+                    let new_plan = if uses_bounded_memory {
+                        Arc::new(BoundedWindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    } else {
+                        Arc::new(WindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    };
+                    return Ok(Some(PlanWithSortRequirements {
+                        plan: new_plan,
+                        impact_result_ordering: false,
+                        satisfy_single_distribution: requirements
+                            .satisfy_single_distribution,
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![reversed_request_child],
+                    }));
+                }
+            }
+        }
+        Ok(Some(PlanWithSortRequirements {
+            required_ordering: None,
+            ..requirements
+        }))
+    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        // If the current plan is a SortExec, modify current SortExec to satisfy the parent requirements
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let mut new_plan = sort_exec.input.clone();
+        add_sort_above(&mut new_plan, parent_required_expr)?;
+        Ok(Some(
+            PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+        ))
+    } else {
+        // Can not satisfy the parent requirements, check whether the requirements can be pushed down. If not, add new SortExec.
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let maintains_input_order = plan.maintains_input_order();
+        // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
+        // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
+        // For UnionExec, we can always push down
+        if (maintains_input_order.is_empty()
+            || !maintains_input_order.iter().any(|o| *o)
+            || plan.as_any().downcast_ref::<RepartitionExec>().is_some()
+            || plan.as_any().downcast_ref::<FilterExec>().is_some()
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some())
+            && plan.as_any().downcast_ref::<UnionExec>().is_none()
+        {
+            let mut new_plan = plan.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+            Ok(Some(
+                PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+            ))
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            let request_child = requirements.adjusted_request_ordering[0].as_deref();
+            let child_plan = plan.children()[0].clone();
+            match determine_children_requirement(
+                parent_required,
+                request_child,
+                child_plan,
+            ) {
+                RequirementsCompatibility::Satisfy => Ok(None),
+                RequirementsCompatibility::Compatible(adjusted) => {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![adjusted],
+                        ..requirements
+                    }))
+                }
+                RequirementsCompatibility::NonCompatible => {
+                    let WindowExecInfo {
+                        window_expr,
+                        input_schema,
+                        partition_keys,
+                    } = extract_window_info_from_plan(plan).unwrap();
+                    if should_reverse_window_exec(
+                        parent_required,
+                        request_child,
+                        &input_schema,
+                    ) {
+                        let new_physical_ordering = parent_required_expr.to_vec();
+                        let new_window_expr = window_expr
+                            .iter()
+                            .map(|e| e.get_reverse_expr())
+                            .collect::<Option<Vec<_>>>();
+                        if let Some(window_expr) = new_window_expr {
+                            let uses_bounded_memory =
+                                window_expr.iter().all(|e| e.uses_bounded_memory());
+                            let new_plan = if uses_bounded_memory {
+                                Arc::new(BoundedWindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            } else {
+                                Arc::new(WindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            };
+                            let adjusted_request_ordering =
+                                new_plan.required_input_ordering();
+                            return Ok(Some(PlanWithSortRequirements {
+                                plan: new_plan,
+                                impact_result_ordering: false,
+                                satisfy_single_distribution: requirements
+                                    .satisfy_single_distribution,
+                                required_ordering: None,
+                                adjusted_request_ordering,
+                            }));
+                        }
+                    }
+                    // Can not push down requirements, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
+            // If the current plan is SortMergeJoinExec
+            let left_columns_len = smj.left.schema().fields().len();
+            let expr_source_side =
+                expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
+            match expr_source_side {
+                Some(JoinSide::Left) if maintains_input_order[0] => {
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        parent_required,
+                        parent_required_expr,
+                        JoinSide::Left,
+                    )
+                }
+                Some(JoinSide::Right) if maintains_input_order[1] => {
+                    let new_right_required = match smj.join_type {
+                        JoinType::Inner | JoinType::Right => shift_right_required(
+                            parent_required.unwrap(),
+                            left_columns_len,
+                        )?,
+                        JoinType::RightSemi | JoinType::RightAnti => {
+                            parent_required.unwrap().to_vec()
+                        }
+                        _ => Err(DataFusionError::Plan(
+                            "Unexpected SortMergeJoin type here".to_string(),
+                        ))?,
+                    };
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        Some(new_right_required.deref()),
+                        parent_required_expr,
+                        JoinSide::Right,
+                    )
+                }
+                _ => {
+                    // Can not decide the expr side for SortMergeJoinExec, can not push down, add SortExec;
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if plan.required_input_ordering().iter().any(Option::is_some) {
+            let plan_children = plan.children();
+            let compatible_with_children = izip!(
+                maintains_input_order.iter(),
+                plan.required_input_ordering().into_iter(),
+                plan_children.iter()
+            )
+            .map(|(can_push_down, request_child, child)| {
+                if *can_push_down {
+                    determine_children_requirement(
+                        parent_required,
+                        request_child.as_deref(),
+                        child.clone(),
+                    )
+                } else {
+                    RequirementsCompatibility::NonCompatible
+                }
+            })
+            .collect::<Vec<_>>();
+            if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Satisfy))
+            {
+                // Requirements are satisfied, not need to push down.
+                Ok(None)
+            } else if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Compatible(_)))
+            {
+                // Adjust child requirements and push down the requirements
+                let adjusted = parent_required.map(|r| r.to_vec());
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![adjusted; plan_children.len()],
+                    ..requirements
+                }))
+            } else {
+                // Can not push down, add new SortExec
+                let mut new_plan = plan.clone();
+                add_sort_above(&mut new_plan, parent_required_expr)?;
+                Ok(Some(
+                    PlanWithSortRequirements::new_without_impact_result_ordering(
+                        new_plan,
+                    ),
+                ))
+            }
+        } else {
+            // The current plan does not have its own ordering requirements to its children, consider push down the requirements
+            if let Some(ProjectionExec { expr, .. }) =
+                plan.as_any().downcast_ref::<ProjectionExec>()
+            {
+                // For Projection, we need to transform the requirements to the columns before the Projection
+                // And then to push down the requirements
+                let new_adjusted =
+                    map_requirement_before_projection(parent_required, expr);
+                if new_adjusted.is_some() {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![new_adjusted],
+                        ..requirements
+                    }))
+                } else {
+                    // Can not push down, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            } else {
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![
+                        requirements.required_ordering;
+                        requirements
+                            .adjusted_request_ordering
+                            .len()
+                    ],
+                    ..requirements
+                }))
+            }
+        }
+    }
+}
+
+/// Analyzes a given `Sort` (`plan`) to determine whether the Sort can be removed:
+/// 1) The input already has a finer ordering than this `Sort` enforces.
+/// 2) The `Sort` does not impact the final result ordering.
+fn analyze_immediate_sort_removal(
+    requirements: &PlanWithSortRequirements,
+    sort_exec: &SortExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        sort_exec.input().output_ordering(),
+        sort_exec.output_ordering(),
+        || sort_exec.input().equivalence_properties(),
+    ) {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+            impact_result_ordering: requirements.impact_result_ordering,
+            satisfy_single_distribution: requirements.satisfy_single_distribution,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortExec
+    else if !requirements.impact_result_ordering {
+        if requirements.satisfy_single_distribution
+            && sort_exec.input().output_partitioning().partition_count() > 1
+        {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(CoalescePartitionsExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        } else {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+            })
+        }
+    } else {
+        None
+    }
+}
+
+/// Analyzes a given `SortPreservingMergeExec` (`plan`) to determine whether the SortPreservingMergeExec can be removed:
+/// 1) The input already has a finer ordering than this `SortPreservingMergeExec` enforces.
+/// 2) The `SortPreservingMergeExec` does not impact the final result ordering.
+fn analyze_immediate_spm_removal(
+    requirements: &PlanWithSortRequirements,
+    spm_exec: &SortPreservingMergeExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        spm_exec.input().output_ordering(),
+        Some(spm_exec.expr()),
+        || spm_exec.input().equivalence_properties(),
+    ) && spm_exec.input().output_partitioning().partition_count() <= 1
+    {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: true,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortPreservingMergeExec only
+    else if !requirements.impact_result_ordering {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: vec![requirements.required_ordering.clone()],
+        })
+    } else {
+        None
+    }
+}
+
+/// Determine the children requirements
+/// If the children requirements are more specific, do not push down the parent requirements
+/// If the the parent requirements are more specific, push down the parent requirements
+/// If they are not compatible, need to add Sort.
+fn determine_children_requirement(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    request_child: Option<&[PhysicalSortRequirements]>,
+    child_plan: Arc<dyn ExecutionPlan>,
+) -> RequirementsCompatibility {
+    if requirements_compatible(request_child, parent_required, || {
+        child_plan.equivalence_properties()
+    }) {
+        // request child requirements are more specific, no need to push down the parent requirements
+        RequirementsCompatibility::Satisfy
+    } else if requirements_compatible(parent_required, request_child, || {
+        child_plan.equivalence_properties()
+    }) {
+        // parent requirements are more specific, adjust the request child requirements and push down the new requirements
+        let adjusted = parent_required.map(|r| r.to_vec());
+        RequirementsCompatibility::Compatible(adjusted)
+    } else {
+        RequirementsCompatibility::NonCompatible
+    }
+}
+
+/// Compares window expression's `window_request` and `parent_required_expr` ordering, returns
+/// whether we should reverse the window expression's ordering in order to meet parent's requirements.
+fn check_alignment(
+    input_schema: &SchemaRef,
+    window_request: &PhysicalSortRequirements,
+    parent_required_expr: &PhysicalSortRequirements,
+) -> bool {
+    if parent_required_expr.expr.eq(&window_request.expr)
+        && window_request.sort_options.is_some()
+        && parent_required_expr.sort_options.is_some()
+    {
+        let nullable = parent_required_expr.expr.nullable(input_schema).unwrap();
+        let window_request_opts = window_request.sort_options.unwrap();
+        let parent_required_opts = parent_required_expr.sort_options.unwrap();
+        if nullable {
+            window_request_opts == reverse_sort_options(parent_required_opts)
+        } else {
+            // If the column is not nullable, NULLS FIRST/LAST is not important.
+            window_request_opts.descending != parent_required_opts.descending
+        }
+    } else {
+        false
+    }
+}
+
+fn reverse_window_sort_requirements(
+    request_child: Option<&[PhysicalSortRequirements]>,
+) -> Option<Vec<PhysicalSortRequirements>> {
+    request_child.map(|request| {
+        request
+            .iter()
+            .map(|req| match req.sort_options {
+                None => req.clone(),
+                Some(ops) => PhysicalSortRequirements {
+                    expr: req.expr.clone(),
+                    sort_options: Some(reverse_sort_options(ops)),
+                },
+            })
+            .collect::<Vec<_>>()
+    })
+}
+
+/// Whether to reverse the top WindowExec's sort requirements.
+/// Considering the requirements of the descendants WindowExecs and leaf nodes' output ordering.
+/// TODOļ¼considering all the cases
+fn should_reverse_window_sort_requirements(
+    window_plan: Arc<dyn ExecutionPlan>,
+    top_requirement: Option<&[PhysicalSortRequirements]>,
+    top_reversed_requirement: Option<&[PhysicalSortRequirements]>,
+) -> bool {
+    if top_requirement.is_none() {
+        return false;
+    }
+    let WindowExecInfo { window_expr, .. } =
+        extract_window_info_from_plan(&window_plan).unwrap();
+    let reverse_window_expr = window_expr
+        .iter()
+        .map(|e| e.get_reverse_expr())
+        .collect::<Option<Vec<_>>>();
+    if reverse_window_expr.is_none() {
+        return false;
+    }
+    let flags = window_plan
+        .children()
+        .into_iter()
+        .map(|child| {
+            // If the child is leaf node, check the output ordering
+            if child.children().is_empty()

Review Comment:
   At this state we know that executor is `window`.  It has exactly one child. Hence I think we can remove this iteration. We can just use body of `map`, where `child` is `window_plan.children()[0].clone()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5290: TopDown EnforceSorting implementation

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#issuecomment-1438600953

   > @mingmwang I went through this PR in detail. Thanks for the great work. Let me summarize my observations.
   > 
   > The Top-Down approach is better at pushing down `SortExec` s if it is helpful. However, the Bottom-up approach is better at producing pipeline-friendly plans when it is possible.
   > 
   > Since some tests improve and some regress with this change. I have constructed a unified test bench for these rules (I use union of the tests for `SortEnforcement` and `TopDownSortEnforcement`). The test bench can be found in the [PR](https://github.com/synnada-ai/arrow-datafusion/pull/51). PR body also includes the performance comparison of both rules on the unified test bench. I think we can reach to a rule where all tests in the test bench pass. I will try to do so myself. I encourage you to try it also.
   > 
   > In the meantime, This PR has nice changes that do not need to wait for final rule version. Specifically, printing global flag in `SortExec` and api change from `fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>>` to `fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirements>>>` don't need to wait. I think you can file another PR with these changes.
   
   Thanks a lot, I will take a closer look at your comparing result tomorrow. I guess most of the failure case in Top-Down rule is because the current implementation try to keep the final output ordering and will not remove all the SortExec very aggressively. I will also resolve all the review comments tomorrow.  Regarding split the PR, I do want to split it into two, because in this PR actually besides the new rule implementation and the added new tests, other changes are very tiny.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1117181203


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2129,16 +2131,17 @@ async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()>
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
-            "        SortExec: expr=[c9@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
-            "              CoalesceBatchesExec: target_batch_size=8192",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  SortExec: expr=[c1@0 ASC NULLS LAST], global=false",

Review Comment:
   It seems that final `SortPreservingMergeExec` in the plan is redundant. Its input has already single partition. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org